automate-mixing/auto_dj_mix.py
2026-01-19 11:34:56 +01:00

807 lines
34 KiB
Python

import librosa
import numpy as np
import soundfile as sf
from pydub import AudioSegment
import pyrubberband
import os
from tqdm import tqdm
import logging
import json
import argparse
import subprocess
from multiprocessing import Pool, cpu_count, Manager
import sys
import math
import tempfile
import copy
def apply_ffmpeg_effect(segment, effect_filter_str):
"""Applies an FFmpeg audio filter to an AudioSegment using unique temp files."""
if len(segment) <= 0:
return AudioSegment.empty()
if len(segment) < 10:
segment = segment + AudioSegment.silent(duration=10)
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_in:
tmp_in_path = tmp_in.name
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_out:
tmp_out_path = tmp_out.name
try:
segment.export(tmp_in_path, format="wav")
command = [
"ffmpeg",
"-i", tmp_in_path,
"-af", effect_filter_str,
"-y",
tmp_out_path
]
subprocess.run(command, check=True, capture_output=True, text=True)
processed_segment = AudioSegment.from_file(tmp_out_path)
except subprocess.CalledProcessError as e:
logging.error(f"FFmpeg effect failed! Filter: {effect_filter_str}")
logging.error(f"FFmpeg stderr:\n{e.stderr}")
processed_segment = segment
except Exception as e:
logging.error(f"Unexpected error in apply_ffmpeg_effect: {e}")
processed_segment = segment
finally:
for path in (tmp_in_path, tmp_out_path):
try:
if os.path.exists(path):
os.remove(path)
except:
pass
return processed_segment
def load_tempo_cache(cache_path='tempo_cache.json'):
if os.path.exists(cache_path):
with open(cache_path, 'r') as f:
return json.load(f)
return {}
def save_tempo_cache(cache, cache_path='tempo_cache.json'):
with open(cache_path, 'w') as f:
json.dump(cache, f, indent=4)
def setup_logging(level=logging.INFO):
logging.basicConfig(level=level, format="%(asctime)s - %(levelname)s - %(message)s")
def normalize_audio(audio_segment, target_dbfs=-1.0):
peak_dbfs = audio_segment.max_dBFS
change_in_dbfs = target_dbfs - peak_dbfs
return audio_segment.apply_gain(change_in_dbfs)
def apply_custom_fade(segment, fade_type, duration, fade_power):
if duration == 0 or len(segment) == 0 or fade_power == 0:
return segment
num_samples = int(segment.frame_count())
if num_samples == 0: return segment
t = np.linspace(0.0, 1.0, num_samples)
if fade_type == 'in':
gain_curve = t ** fade_power
elif fade_type == 'out':
gain_curve = (1.0 - t) ** fade_power
else:
return segment
samples = np.array(segment.get_array_of_samples()).astype(np.float64)
samples = samples.reshape((-1, segment.channels))
samples[:] *= gain_curve[:, np.newaxis]
faded_samples = samples.flatten().astype(segment.array_type)
return AudioSegment(data=faded_samples.tobytes(), sample_width=segment.sample_width, frame_rate=segment.frame_rate, channels=segment.channels)
def parse_time(tstr):
parts = str(tstr).split(':')
parts = [float(p) for p in parts]
if len(parts) == 3:
h, m, s = parts
elif len(parts) == 2:
h = 0
m, s = parts
else:
raise ValueError("Invalid time format")
return int((h*3600 + m*60 + s)*1000)
def add_flac_chapters(file_path, chapters):
logger = logging.getLogger(__name__)
logger.info(f"Adding {len(chapters)} chapter markers to {file_path}")
for i, (start_time_ms, title) in enumerate(chapters):
ms = start_time_ms % 1000
secs = (start_time_ms // 1000) % 60
mins = (start_time_ms // (1000 * 60)) % 60
hours = (start_time_ms // (1000 * 3600))
start_time_str = f"{hours:02}:{mins:02}:{secs:02}.{ms:03}"
command = [
"metaflac",
f"--set-tag=CHAPTER{i+1:02}={start_time_str}",
f"--set-tag=CHAPTER{i+1:02}NAME={title}",
file_path
]
try:
subprocess.run(command, check=True, capture_output=True, text=True)
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error(f"Error running metaflac for chapter {i+1}.")
break
def get_reliable_tempo(audio, sample_rate, song_path, tempo_cache, default_tempo=120.0):
logger = logging.getLogger(__name__)
if song_path in tempo_cache:
logger.info(f"Using cached tempo for {song_path}: {tempo_cache[song_path]}")
return tempo_cache[song_path]
try:
tempo = librosa.feature.tempo(y=audio, sr=sample_rate)[0]
if not np.isfinite(tempo) or tempo <= 0:
tempo = default_tempo
tempo_cache[song_path] = float(tempo)
logger.info(f"Detected tempo for {song_path}: {tempo}")
return float(tempo)
except Exception as e:
logger.warning(f"Tempo detection failed for {song_path}: {e}. Using default tempo {default_tempo}")
return default_tempo
def get_normalized_ramps(config, start_offset_ms):
"""Extracts ramps from config and adjusts start/end times by subtracting the start_offset."""
raw_ramps = config.get("tempo_ramps", config.get("temp_ramps", []))
if not raw_ramps:
return []
normalized_ramps = []
for ramp in raw_ramps:
new_ramp = copy.deepcopy(ramp)
s_time = parse_time(ramp["start_time"])
e_time = parse_time(ramp["end_time"])
# Shift times to be relative to the trimmed audio start
adj_s_time = max(0, s_time - start_offset_ms)
adj_e_time = max(0, e_time - start_offset_ms)
new_ramp["start_time_ms"] = adj_s_time
new_ramp["end_time_ms"] = adj_e_time
if adj_e_time > adj_s_time:
normalized_ramps.append(new_ramp)
return sorted(normalized_ramps, key=lambda r: r["start_time_ms"])
def apply_tempo_ramps(y, sr, normalized_ramps, original_tempo, ref_tempo):
logger = logging.getLogger(__name__)
if not normalized_ramps:
return y
# Create paths but don't keep them open
tmp_in = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
tmp_in_path = tmp_in.name
tmp_in.close()
tmp_out = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
tmp_out_path = tmp_out.name
tmp_out.close()
tmp_map = tempfile.NamedTemporaryFile(suffix='.txt', delete=False)
tmp_map_path = tmp_map.name
tmp_map.close()
try:
# 1. Write the audio to disk as standard 16-bit PCM WAV
sf.write(tmp_in_path, y, sr, subtype='PCM_16')
# 2. Generate the Time Map
map_points = []
map_points.append(f"0 0\n")
current_sample_in = 0.0
current_sample_out = 0.0
current_tempo = ref_tempo if ref_tempo else original_tempo
CHUNK_SIZE = 1024
total_samples = len(y)
ramps = sorted(normalized_ramps, key=lambda r: r["start_time_ms"])
while current_sample_in < total_samples:
time_ms = (current_sample_in / sr) * 1000.0
instant_tempo = current_tempo
# Find active ramp
for ramp in ramps:
if ramp["start_time_ms"] <= time_ms <= ramp["end_time_ms"]:
ramp_start_tempo = ref_tempo if ref_tempo else original_tempo
for prev_ramp in ramps:
if prev_ramp["end_time_ms"] <= ramp["start_time_ms"]:
ramp_start_tempo = float(prev_ramp["end_tempo"])
dur = ramp["end_time_ms"] - ramp["start_time_ms"]
if dur > 0:
prog = (time_ms - ramp["start_time_ms"]) / dur
instant_tempo = ramp_start_tempo + (float(ramp["end_tempo"]) - ramp_start_tempo) * prog
break
elif time_ms > ramp["end_time_ms"]:
instant_tempo = float(ramp["end_tempo"])
# Calculate stretch ratio
ratio = instant_tempo / original_tempo
# Advance
samples_added = CHUNK_SIZE / ratio
current_sample_in += CHUNK_SIZE
current_sample_out += samples_added
# Only write points that are within the file bounds
if current_sample_in < total_samples:
map_points.append(f"{int(current_sample_in)} {int(current_sample_out)}\n")
# Add the final exact end point to the map to ensure completeness
map_points.append(f"{total_samples} {int(current_sample_out)}\n")
# Write map file
with open(tmp_map_path, 'w') as f:
f.writelines(map_points)
# Calculate total expected duration in seconds for the -D flag
total_duration_sec = current_sample_out / sr
# 3. Run Rubberband CLI
# We use -D to specify the duration (REQUIRED by your version when using --timemap)
# We removed --threading and --precise as they are not supported by your version
# We use -q for quiet mode
command = [
"rubberband",
"--quiet",
"--timemap", tmp_map_path,
"-D", str(total_duration_sec),
tmp_in_path,
tmp_out_path
]
subprocess.run(command, check=True, capture_output=True, text=True)
# 4. Load result
y_processed, _ = sf.read(tmp_out_path, dtype='float32')
# Fix dimensions if mono/stereo mismatch occurs
if y.ndim == 2 and y_processed.ndim == 1:
y_processed = np.column_stack((y_processed, y_processed))
elif y.ndim == 1 and y_processed.ndim == 2:
y_processed = librosa.to_mono(y_processed.T)
return y_processed
except subprocess.CalledProcessError as e:
logger.error("Rubberband CLI failed.")
logger.error(f"Stderr: {e.stderr}")
return y
except Exception as e:
logger.error(f"Map generation failed: {e}")
return y
finally:
for p in [tmp_in_path, tmp_out_path, tmp_map_path]:
if os.path.exists(p):
try:
os.remove(p)
except:
pass
def compute_mapped_time(original_pos_sec, normalized_ramps, original_tempo, ref_tempo, total_original_duration_sec):
"""Computes time in stretched audio using normalized (0-based) ramps."""
if not normalized_ramps and ref_tempo is None:
return original_pos_sec
if not normalized_ramps and ref_tempo is not None:
return original_pos_sec * (original_tempo / ref_tempo)
current_time = 0.0
mapped_time = 0.0
current_tempo = ref_tempo if ref_tempo else original_tempo
for ramp in normalized_ramps:
start = ramp["start_time_ms"] / 1000.0
end = ramp["end_time_ms"] / 1000.0
end_tempo = float(ramp["end_tempo"])
if start < current_time: continue
# Constant segment before ramp
if start > current_time:
seg_dur = min(start, original_pos_sec) - current_time
if seg_dur > 0:
mapped_time += seg_dur * (original_tempo / current_tempo)
current_time += seg_dur
if current_time >= original_pos_sec: return mapped_time
# Ramp segment
ramp_dur = end - start
a = current_tempo
b = (end_tempo - current_tempo) / ramp_dur
u = min(original_pos_sec, end) - start
if u > 0:
if abs(b) < 1e-9:
mapped_time += u * (original_tempo / a)
else:
mapped_time += original_tempo * (math.log(a + b * u) - math.log(a)) / b
current_time += u
if current_time >= original_pos_sec: return mapped_time
current_tempo = end_tempo
current_time = end
# Final constant segment
seg_dur = max(0, original_pos_sec - current_time)
if seg_dur > 0:
mapped_time += seg_dur * (original_tempo / current_tempo)
return mapped_time
def process_song(config, index, tempo_cache, settings, reference_tempo=None, sample_rate=None):
log_messages = []
song_path = config["song_path"]
start_offset = parse_time(config.get("start_offset", "0:00"))
no_tempo_adjust = config.get("no_tempo_adjust", False)
log_messages.append(f"Processing song {index+1}: {song_path}")
try:
audio_segment = AudioSegment.from_file(song_path)
if start_offset > 0:
audio_segment = audio_segment[start_offset:]
# 1. Convert Audio to Arrays for Analysis/Processing EARLY (Needed for Loop Snapping)
full_duration = len(audio_segment)
y_raw = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
y_raw /= (1 << (8 * audio_segment.sample_width - 1))
if audio_segment.channels == 2:
y_stereo = y_raw.reshape((-1, 2))
y_mono = librosa.to_mono(y_stereo.T)
else:
y_mono = y_raw if audio_segment.channels == 1 else librosa.to_mono(y_raw.reshape((-1, audio_segment.channels)).T)
y_stereo = np.column_stack((y_mono, y_mono))
current_sample_rate = audio_segment.frame_rate
if sample_rate is None:
sample_rate = current_sample_rate
# 2. Detect Tempo EARLY
tempo = get_reliable_tempo(y_mono, sample_rate, song_path, tempo_cache)
# --- Apply Loops (With BPM Grid Snapping) ---
if "loops" in config:
# We need to reconstruct audio_segment because loops change the length
# We'll do this by slicing the original audio_segment based on calculated times
new_audio = AudioSegment.empty()
# Sort loops by time to handle them sequentially
loops = sorted(config["loops"], key=lambda x: parse_time(x.get("start_time", "0:00")))
current_pos = 0
for loop in loops:
loop_start_ms = parse_time(loop.get("start_time", "0:00"))
loop_end_ms = parse_time(loop.get("end_time"))
count = int(loop.get("count", 1))
if loop_start_ms >= loop_end_ms: continue
# Add audio BEFORE the loop
if loop_start_ms > current_pos:
new_audio += audio_segment[current_pos:loop_start_ms]
# --- GRID SNAPPING LOGIC ---
# Calculate exact beat duration at this song's tempo
beat_len_ms = (60.0 / tempo) * 1000.0
# How long is the user's manual loop?
user_loop_duration_ms = loop_end_ms - loop_start_ms
# How many beats is that likely to be? (Round to nearest whole beat)
num_beats = round(user_loop_duration_ms / beat_len_ms)
if num_beats == 0: num_beats = 1 # Prevent division by zero for tiny loops
# What is the PERFECT grid length for that many beats?
perfect_duration_ms = num_beats * beat_len_ms
log_messages.append(f"Loop Correction: {user_loop_duration_ms}ms -> {perfect_duration_ms:.2f}ms ({num_beats} beats @ {tempo} BPM)")
# Extract the loop audio
loop_segment = audio_segment[loop_start_ms:loop_end_ms]
# Time-Stretch the loop segment to match perfect_duration_ms exactly
# This ensures 28 loops don't drift by even 1 millisecond.
# Convert loop to numpy for rubberband
l_raw = np.array(loop_segment.get_array_of_samples()).astype(np.float32)
l_raw /= (1 << (8 * loop_segment.sample_width - 1))
if loop_segment.channels == 2:
l_stereo = l_raw.reshape((-1, 2))
else:
l_mono = l_raw
l_stereo = np.column_stack((l_mono, l_mono))
# Stretch Factor
# If user cut is 3.93s and perfect is 3.931s, we stretch by ~1.0002 (imperceptible pitch change)
stretch_ratio = perfect_duration_ms / user_loop_duration_ms
# In rubberband: ratio > 1.0 is shorter/faster. We want longer duration.
# duration_new = duration_old * stretch_rate ?? No.
# pyrubberband time_stretch(y, sr, rate). rate=2.0 makes it half duration.
# We want duration to go from User -> Perfect.
# rate = User / Perfect.
rb_rate = user_loop_duration_ms / perfect_duration_ms
l_stretched_data = pyrubberband.time_stretch(l_stereo, sample_rate, rb_rate)
# Convert back to AudioSegment
# Ensure 16-bit PCM for Pydub compatibility
l_stretched_data_int = (l_stretched_data * (2**15 - 1)).astype(np.int16)
perfect_loop_segment = AudioSegment(
l_stretched_data_int.tobytes(),
frame_rate=sample_rate,
sample_width=2,
channels=2
)
# Append the snapped loop X times
new_audio += (perfect_loop_segment * count)
current_pos = loop_end_ms
# Add remaining audio after the last loop
if current_pos < len(audio_segment):
new_audio += audio_segment[current_pos:]
audio_segment = new_audio
# --- Apply EQ Filters ---
if "eq_filters" in config:
for eq_filter in config["eq_filters"]:
filter_type = eq_filter.get("type")
cutoff_hz = eq_filter.get("cutoff_hz")
start_ms = parse_time(eq_filter.get("start_time", "0:00"))
end_time_str = eq_filter.get("end_time")
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
if not filter_type or not cutoff_hz or start_ms >= end_ms: continue
pre_segment = audio_segment[:start_ms]
segment_to_filter = audio_segment[start_ms:end_ms]
post_segment = audio_segment[end_ms:]
if filter_type == "low_pass":
filtered_slice = segment_to_filter.low_pass_filter(cutoff_hz)
elif filter_type == "high_pass":
filtered_slice = segment_to_filter.high_pass_filter(cutoff_hz)
elif filter_type == "band_pass":
low, high = eq_filter.get("low_cutoff_hz"), eq_filter.get("high_cutoff_hz")
filtered_slice = segment_to_filter.high_pass_filter(low).low_pass_filter(high) if low and high else segment_to_filter
elif filter_type == "band_reject":
low, high = eq_filter.get("low_cutoff_hz"), eq_filter.get("high_cutoff_hz")
filtered_slice = segment_to_filter.low_pass_filter(low).overlay(segment_to_filter.high_pass_filter(high)) if low and high else segment_to_filter
else:
filtered_slice = segment_to_filter
audio_segment = pre_segment + filtered_slice + post_segment
# --- Apply Volume Automation ---
if "volume_automation" in config:
for automation in config["volume_automation"]:
start_ms = parse_time(automation.get("start_time", "0:00"))
end_time_str = automation.get("end_time")
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
gain_db = float(automation.get("gain_db", 0))
if start_ms >= end_ms: continue
pre_segment = audio_segment[:start_ms]
segment_to_automate = audio_segment[start_ms:end_ms]
post_segment = audio_segment[end_ms:]
audio_segment = pre_segment + segment_to_automate.apply_gain(gain_db) + post_segment
# --- Apply Band Gains (With Ramping) ---
if "band_gains" in config:
for band_gain in config["band_gains"]:
start_ms = parse_time(band_gain.get("start_time", "0:00"))
end_time_str = band_gain.get("end_time")
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
target_low_gain_db = float(band_gain.get("low_gain_db", 0))
target_mid_gain_db = float(band_gain.get("mid_gain_db", 0))
target_high_gain_db = float(band_gain.get("high_gain_db", 0))
if start_ms >= end_ms: continue
if target_low_gain_db == 0 and target_mid_gain_db == 0 and target_high_gain_db == 0:
continue
pre_segment = audio_segment[:start_ms]
segment_to_process = audio_segment[start_ms:end_ms]
post_segment = audio_segment[end_ms:]
ramp_duration_ms = len(segment_to_process)
if ramp_duration_ms <= 0: continue
chunk_duration_ms = 250
num_chunks = max(2, int(ramp_duration_ms / chunk_duration_ms))
chunk_duration_ms = ramp_duration_ms / num_chunks
processed_chunks = []
for i in range(num_chunks):
chunk_start_ms = int(i * chunk_duration_ms)
chunk_end_ms = int((i + 1) * chunk_duration_ms)
chunk = segment_to_process[chunk_start_ms:chunk_end_ms]
if len(chunk) == 0: continue
t = (i + 0.5) / num_chunks
low_gain = t * target_low_gain_db
mid_gain = t * target_mid_gain_db
high_gain = t * target_high_gain_db
filter_str = []
if low_gain != 0: filter_str.append(f"bass=g={low_gain}:f=250:w=0.707")
if mid_gain != 0: filter_str.append(f"equalizer=f=1000:t=q:w=1:g={mid_gain}")
if high_gain != 0: filter_str.append(f"treble=g={high_gain}:f=2500:w=0.707")
if filter_str:
processed_chunk = apply_ffmpeg_effect(chunk, ",".join(filter_str))
else:
processed_chunk = chunk
processed_chunks.append(processed_chunk)
processed_segment = AudioSegment.empty()
for chunk in processed_chunks:
processed_segment += chunk
filter_str_post = []
if target_low_gain_db != 0: filter_str_post.append(f"bass=g={target_low_gain_db}:f=250:w=0.707")
if target_mid_gain_db != 0: filter_str_post.append(f"equalizer=f=1000:t=q:w=1:g={target_mid_gain_db}")
if target_high_gain_db != 0: filter_str_post.append(f"treble=g={target_high_gain_db}:f=2500:w=0.707")
if filter_str_post and len(post_segment) > 0:
post_segment = apply_ffmpeg_effect(post_segment, ",".join(filter_str_post))
audio_segment = pre_segment + processed_segment + post_segment
# --- Apply Effects ---
if "effects" in config:
for effect in config["effects"]:
start_ms = parse_time(effect.get("start_time", "0:00"))
end_time_str = effect.get("end_time")
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
effect_type = effect.get("type")
if start_ms >= end_ms: continue
pre_segment = audio_segment[:start_ms]
segment_to_process = audio_segment[start_ms:end_ms]
post_segment = audio_segment[end_ms:]
effect_str = ""
if effect_type == "reverb":
wet = effect.get("wet", 0.4)
effect_str = f"afftfilt=real='hypot(re,im)*cos(random(0))*{wet}+re*(1-{wet})':imag='hypot(re,im)*sin(random(0))*{wet}+im*(1-{wet})'"
elif effect_type == "delay":
delay_ms = effect.get("delay_ms", 500)
effect_str = f"adelay=delays={delay_ms}:all=1"
elif effect_type == "raw_ffmpeg":
effect_str = effect.get("filter_string", "")
if effect_str:
processed_slice = apply_ffmpeg_effect(segment_to_process, effect_str)
audio_segment = pre_segment + processed_slice + post_segment
# Apply pre-mix normalization
pre_mix_target_dbfs = config.get("target_dBFS", settings.get("pre_mix_target_dbfs", -12.0))
if config.get("apply_pre_mix_normalization", True):
audio_segment = normalize_audio(audio_segment, pre_mix_target_dbfs)
full_duration = len(audio_segment)
# Prepare Final Output for Stretch
y_raw = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
y_raw /= (1 << (8 * audio_segment.sample_width - 1))
if audio_segment.channels == 2:
y_stereo = y_raw.reshape((-1, 2))
y_mono = librosa.to_mono(y_stereo.T)
else:
y_mono = y_raw if audio_segment.channels == 1 else librosa.to_mono(y_raw.reshape((-1, audio_segment.channels)).T)
y_stereo = np.column_stack((y_mono, y_mono))
# Tempo Detection (Already done earlier, reusing value)
# Logic for Ramps and Adjustments
normalized_ramps = get_normalized_ramps(config, start_offset)
effective_ref = None if no_tempo_adjust else reference_tempo
final_output_tempo = tempo
stretched_audio = y_stereo
if normalized_ramps:
log_messages.append(f"Applying tempo ramps with initial ref {effective_ref if effective_ref else tempo}")
stretched_audio = apply_tempo_ramps(stretched_audio, sample_rate, normalized_ramps, tempo, effective_ref)
final_output_tempo = float(normalized_ramps[-1]["end_tempo"])
elif index > 0 and not no_tempo_adjust:
stretch_rate = reference_tempo / tempo
stretched_audio = pyrubberband.time_stretch(y_stereo, sample_rate, stretch_rate)
final_output_tempo = reference_tempo
else:
final_output_tempo = tempo
temp_file = f"temp_song{index}.flac"
sf.write(temp_file, stretched_audio, sample_rate)
return temp_file, sample_rate, tempo, full_duration, len(AudioSegment.from_file(temp_file)), log_messages, final_output_tempo, normalized_ramps
except Exception as e:
logging.error(f"Error processing {song_path}: {e}")
return None, None, None, None, None, [f"ERROR: Failed to process {song_path}"], None, []
def mix_songs(playlist_data):
settings = playlist_data["settings"]
song_configs = playlist_data["tracks"]
output_file = settings.get("output_filename", "mix_output.flac")
default_crossfade = settings.get("default_crossfade_duration", "0:10")
final_mix_target_dbfs = settings.get("final_mix_target_dbfs", -1.0)
num_cores = settings.get("num_cores", 0)
logger = logging.getLogger(__name__)
manager = Manager()
tempo_cache = manager.dict(load_tempo_cache())
# --- Processing Phase ---
first_result = process_song(song_configs[0], 0, tempo_cache, settings, None)
if first_result[0] is None: sys.exit(1)
current_chain_tempo = first_result[6]
sample_rate = first_result[1]
processed_configs = [first_result]
for i, config in enumerate(song_configs[1:], 1):
no_adjust = config.get("no_tempo_adjust", False)
ref_tempo = current_chain_tempo if not no_adjust else None
result = process_song(config, i, tempo_cache, settings, ref_tempo, sample_rate)
processed_configs.append(result)
if result[6] is not None:
current_chain_tempo = result[6]
# --- Mixing Phase ---
temp_files = [res[0] for res in processed_configs]
if any(t is None for t in temp_files): sys.exit(1)
current_mix = AudioSegment.from_file(temp_files[0])
chapters = [(0, os.path.basename(song_configs[0]['song_path']))]
song_start_times = [0]
for index, config in enumerate(tqdm(song_configs[1:], desc="Mixing songs", unit="song"), start=1):
prev_song_config = song_configs[index-1]
prev_result = processed_configs[index-1]
prev_song_orig_tempo = prev_result[2]
prev_song_ramps = prev_result[7] # Normalized ramps
if index == 1:
prev_effective_start_tempo = None
else:
p_prev_res = processed_configs[index-2]
p_prev_no_adj = prev_song_config.get("no_tempo_adjust", False)
if p_prev_no_adj:
prev_effective_start_tempo = None
else:
prev_effective_start_tempo = p_prev_res[6]
crossfade_start_str = config.get("crossfade_start")
crossfade_duration = parse_time(config.get("crossfade_duration", default_crossfade))
prev_song_start_offset = parse_time(prev_song_config.get("start_offset", "0:00"))
relative_crossfade_start = parse_time(crossfade_start_str)
original_pos_in_trimmed = max(0, relative_crossfade_start - prev_song_start_offset) / 1000.0
adj_pos_sec = compute_mapped_time(
original_pos_in_trimmed,
prev_song_ramps,
prev_song_orig_tempo,
prev_effective_start_tempo,
prev_result[3] / 1000.0
)
adj_crossfade_in_segment = int(adj_pos_sec * 1000)
next_song_start_time = song_start_times[-1] + adj_crossfade_in_segment
next_song_start_time = max(0, next_song_start_time)
next_song = AudioSegment.from_file(temp_files[index])
fade_in_duration = parse_time(config.get("fade_in_duration")) if config.get("fade_in_duration") else crossfade_duration
fade_out_power = float(prev_song_config.get("fade_out_power", prev_song_config.get("fade_power", 1.0)))
fade_in_power = float(config.get("fade_in_power", config.get("fade_power", 1.0)))
fade_out_delay = parse_time(prev_song_config.get("fade_out_delay", "0:00"))
track1_before_fade = current_mix[:next_song_start_time]
track1_total_overlap = current_mix[next_song_start_time : next_song_start_time + crossfade_duration]
actual_overlap_duration = len(track1_total_overlap)
track1_constant_part = track1_total_overlap[:fade_out_delay]
track1_fade_part = track1_total_overlap[fade_out_delay:]
faded_out_segment = apply_custom_fade(track1_fade_part, 'out', len(track1_fade_part), fade_out_power)
faded_out_part = track1_constant_part + faded_out_segment
track2_overlay_section = next_song[:actual_overlap_duration]
actual_fade_in_duration = min(fade_in_duration, actual_overlap_duration)
track2_fade_in_part = track2_overlay_section[:actual_fade_in_duration]
track2_after_fade_in = track2_overlay_section[actual_fade_in_duration:]
faded_in_part = apply_custom_fade(track2_fade_in_part, 'in', actual_fade_in_duration, fade_in_power)
full_track2_overlay = faded_in_part + track2_after_fade_in
crossfade_result = faded_out_part.overlay(full_track2_overlay)
if crossfade_result.max_dBFS > -0.1:
crossfade_result = normalize_audio(crossfade_result, -0.1)
current_mix = track1_before_fade + crossfade_result + next_song[actual_overlap_duration:]
song_start_times.append(next_song_start_time)
chapters.append((next_song_start_time, os.path.basename(config['song_path'])))
# --- Export ---
logger.info("Exporting raw mix...")
temp_pcm = "temp_raw_mix.raw"
current_mix.export(temp_pcm, format="raw")
temp_raw_file = "temp_raw_mix.flac"
bits = current_mix.sample_width * 8
pcm_format = f"s{bits}le"
subprocess.run(["ffmpeg", "-f", pcm_format, "-ar", str(current_mix.frame_rate), "-ac", str(current_mix.channels), "-i", temp_pcm, "-c:a", "flac", "-y", temp_raw_file], check=True, capture_output=True)
logger.info("Normalizing...")
subprocess.run(["ffmpeg", "-i", temp_raw_file, "-af", f"loudnorm=I=-14:LRA=7:TP={final_mix_target_dbfs}", "-y", output_file], check=True, capture_output=True)
for f in [temp_pcm, temp_raw_file] + temp_files:
if os.path.exists(f): os.remove(f)
add_flac_chapters(output_file, chapters)
save_tempo_cache(tempo_cache.copy())
return output_file, len(current_mix), current_chain_tempo
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--playlist", type=str, default="ambient_mix_settings.json")
parser.add_argument("--debug", action="store_true")
parser.add_argument("--test-last-two", action="store_true")
parser.add_argument("--tracks", type=str)
args = parser.parse_args()
setup_logging(level=logging.DEBUG if args.debug else logging.INFO)
logger = logging.getLogger(__name__)
with open(args.playlist, 'r') as f:
playlist_data = json.load(f)
if args.tracks:
try:
track_numbers = [int(t.strip()) for t in args.tracks.split(',')]
original_tracks = playlist_data['tracks']
playlist_data['tracks'] = [original_tracks[i-1] for i in track_numbers if 0 < i <= len(original_tracks)]
except ValueError:
logger.error("Invalid format for --tracks.")
sys.exit(1)
elif args.test_last_two:
playlist_data['tracks'] = playlist_data['tracks'][-2:]
try:
output_file, final_duration, ref_tempo = mix_songs(playlist_data)
print(f"\nSuccess! Output: {output_file}")
except Exception as e:
logging.error(f"Error: {e}", exc_info=True)