import librosa import numpy as np import soundfile as sf from pydub import AudioSegment import pyrubberband import os from tqdm import tqdm import logging import json import argparse import subprocess from multiprocessing import Pool, cpu_count, Manager import sys import math import tempfile import copy def apply_ffmpeg_effect(segment, effect_filter_str): """Applies an FFmpeg audio filter to an AudioSegment using unique temp files.""" if len(segment) <= 0: return AudioSegment.empty() if len(segment) < 10: segment = segment + AudioSegment.silent(duration=10) with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_in: tmp_in_path = tmp_in.name with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_out: tmp_out_path = tmp_out.name try: segment.export(tmp_in_path, format="wav") command = [ "ffmpeg", "-i", tmp_in_path, "-af", effect_filter_str, "-y", tmp_out_path ] subprocess.run(command, check=True, capture_output=True, text=True) processed_segment = AudioSegment.from_file(tmp_out_path) except subprocess.CalledProcessError as e: logging.error(f"FFmpeg effect failed! Filter: {effect_filter_str}") logging.error(f"FFmpeg stderr:\n{e.stderr}") processed_segment = segment except Exception as e: logging.error(f"Unexpected error in apply_ffmpeg_effect: {e}") processed_segment = segment finally: for path in (tmp_in_path, tmp_out_path): try: if os.path.exists(path): os.remove(path) except: pass return processed_segment def load_tempo_cache(cache_path='tempo_cache.json'): if os.path.exists(cache_path): with open(cache_path, 'r') as f: return json.load(f) return {} def save_tempo_cache(cache, cache_path='tempo_cache.json'): with open(cache_path, 'w') as f: json.dump(cache, f, indent=4) def setup_logging(level=logging.INFO): logging.basicConfig(level=level, format="%(asctime)s - %(levelname)s - %(message)s") def normalize_audio(audio_segment, target_dbfs=-1.0): peak_dbfs = audio_segment.max_dBFS change_in_dbfs = target_dbfs - peak_dbfs return audio_segment.apply_gain(change_in_dbfs) def apply_custom_fade(segment, fade_type, duration, fade_power): if duration == 0 or len(segment) == 0 or fade_power == 0: return segment num_samples = int(segment.frame_count()) if num_samples == 0: return segment t = np.linspace(0.0, 1.0, num_samples) if fade_type == 'in': gain_curve = t ** fade_power elif fade_type == 'out': gain_curve = (1.0 - t) ** fade_power else: return segment samples = np.array(segment.get_array_of_samples()).astype(np.float64) samples = samples.reshape((-1, segment.channels)) samples[:] *= gain_curve[:, np.newaxis] faded_samples = samples.flatten().astype(segment.array_type) return AudioSegment(data=faded_samples.tobytes(), sample_width=segment.sample_width, frame_rate=segment.frame_rate, channels=segment.channels) def parse_time(tstr): parts = str(tstr).split(':') parts = [float(p) for p in parts] if len(parts) == 3: h, m, s = parts elif len(parts) == 2: h = 0 m, s = parts else: raise ValueError("Invalid time format") return int((h*3600 + m*60 + s)*1000) def add_flac_chapters(file_path, chapters): logger = logging.getLogger(__name__) logger.info(f"Adding {len(chapters)} chapter markers to {file_path}") for i, (start_time_ms, title) in enumerate(chapters): ms = start_time_ms % 1000 secs = (start_time_ms // 1000) % 60 mins = (start_time_ms // (1000 * 60)) % 60 hours = (start_time_ms // (1000 * 3600)) start_time_str = f"{hours:02}:{mins:02}:{secs:02}.{ms:03}" command = [ "metaflac", f"--set-tag=CHAPTER{i+1:02}={start_time_str}", f"--set-tag=CHAPTER{i+1:02}NAME={title}", file_path ] try: subprocess.run(command, check=True, capture_output=True, text=True) except (subprocess.CalledProcessError, FileNotFoundError): logger.error(f"Error running metaflac for chapter {i+1}.") break def get_reliable_tempo(audio, sample_rate, song_path, tempo_cache, default_tempo=120.0): logger = logging.getLogger(__name__) if song_path in tempo_cache: logger.info(f"Using cached tempo for {song_path}: {tempo_cache[song_path]}") return tempo_cache[song_path] try: tempo = librosa.feature.tempo(y=audio, sr=sample_rate)[0] if not np.isfinite(tempo) or tempo <= 0: tempo = default_tempo tempo_cache[song_path] = float(tempo) logger.info(f"Detected tempo for {song_path}: {tempo}") return float(tempo) except Exception as e: logger.warning(f"Tempo detection failed for {song_path}: {e}. Using default tempo {default_tempo}") return default_tempo def get_normalized_ramps(config, start_offset_ms): """Extracts ramps from config and adjusts start/end times by subtracting the start_offset.""" raw_ramps = config.get("tempo_ramps", config.get("temp_ramps", [])) if not raw_ramps: return [] normalized_ramps = [] for ramp in raw_ramps: new_ramp = copy.deepcopy(ramp) s_time = parse_time(ramp["start_time"]) e_time = parse_time(ramp["end_time"]) # Shift times to be relative to the trimmed audio start adj_s_time = max(0, s_time - start_offset_ms) adj_e_time = max(0, e_time - start_offset_ms) new_ramp["start_time_ms"] = adj_s_time new_ramp["end_time_ms"] = adj_e_time if adj_e_time > adj_s_time: normalized_ramps.append(new_ramp) return sorted(normalized_ramps, key=lambda r: r["start_time_ms"]) def apply_tempo_ramps(y, sr, normalized_ramps, original_tempo, ref_tempo): logger = logging.getLogger(__name__) if not normalized_ramps: return y # Create paths but don't keep them open tmp_in = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) tmp_in_path = tmp_in.name tmp_in.close() tmp_out = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) tmp_out_path = tmp_out.name tmp_out.close() tmp_map = tempfile.NamedTemporaryFile(suffix='.txt', delete=False) tmp_map_path = tmp_map.name tmp_map.close() try: # 1. Write the audio to disk as standard 16-bit PCM WAV sf.write(tmp_in_path, y, sr, subtype='PCM_16') # 2. Generate the Time Map map_points = [] map_points.append(f"0 0\n") current_sample_in = 0.0 current_sample_out = 0.0 current_tempo = ref_tempo if ref_tempo else original_tempo CHUNK_SIZE = 1024 total_samples = len(y) ramps = sorted(normalized_ramps, key=lambda r: r["start_time_ms"]) while current_sample_in < total_samples: time_ms = (current_sample_in / sr) * 1000.0 instant_tempo = current_tempo # Find active ramp for ramp in ramps: if ramp["start_time_ms"] <= time_ms <= ramp["end_time_ms"]: ramp_start_tempo = ref_tempo if ref_tempo else original_tempo for prev_ramp in ramps: if prev_ramp["end_time_ms"] <= ramp["start_time_ms"]: ramp_start_tempo = float(prev_ramp["end_tempo"]) dur = ramp["end_time_ms"] - ramp["start_time_ms"] if dur > 0: prog = (time_ms - ramp["start_time_ms"]) / dur instant_tempo = ramp_start_tempo + (float(ramp["end_tempo"]) - ramp_start_tempo) * prog break elif time_ms > ramp["end_time_ms"]: instant_tempo = float(ramp["end_tempo"]) # Calculate stretch ratio ratio = instant_tempo / original_tempo # Advance samples_added = CHUNK_SIZE / ratio current_sample_in += CHUNK_SIZE current_sample_out += samples_added # Only write points that are within the file bounds if current_sample_in < total_samples: map_points.append(f"{int(current_sample_in)} {int(current_sample_out)}\n") # Add the final exact end point to the map to ensure completeness map_points.append(f"{total_samples} {int(current_sample_out)}\n") # Write map file with open(tmp_map_path, 'w') as f: f.writelines(map_points) # Calculate total expected duration in seconds for the -D flag total_duration_sec = current_sample_out / sr # 3. Run Rubberband CLI # We use -D to specify the duration (REQUIRED by your version when using --timemap) # We removed --threading and --precise as they are not supported by your version # We use -q for quiet mode command = [ "rubberband", "--quiet", "--timemap", tmp_map_path, "-D", str(total_duration_sec), tmp_in_path, tmp_out_path ] subprocess.run(command, check=True, capture_output=True, text=True) # 4. Load result y_processed, _ = sf.read(tmp_out_path, dtype='float32') # Fix dimensions if mono/stereo mismatch occurs if y.ndim == 2 and y_processed.ndim == 1: y_processed = np.column_stack((y_processed, y_processed)) elif y.ndim == 1 and y_processed.ndim == 2: y_processed = librosa.to_mono(y_processed.T) return y_processed except subprocess.CalledProcessError as e: logger.error("Rubberband CLI failed.") logger.error(f"Stderr: {e.stderr}") return y except Exception as e: logger.error(f"Map generation failed: {e}") return y finally: for p in [tmp_in_path, tmp_out_path, tmp_map_path]: if os.path.exists(p): try: os.remove(p) except: pass def compute_mapped_time(original_pos_sec, normalized_ramps, original_tempo, ref_tempo, total_original_duration_sec): """Computes time in stretched audio using normalized (0-based) ramps.""" if not normalized_ramps and ref_tempo is None: return original_pos_sec if not normalized_ramps and ref_tempo is not None: return original_pos_sec * (original_tempo / ref_tempo) current_time = 0.0 mapped_time = 0.0 current_tempo = ref_tempo if ref_tempo else original_tempo for ramp in normalized_ramps: start = ramp["start_time_ms"] / 1000.0 end = ramp["end_time_ms"] / 1000.0 end_tempo = float(ramp["end_tempo"]) if start < current_time: continue # Constant segment before ramp if start > current_time: seg_dur = min(start, original_pos_sec) - current_time if seg_dur > 0: mapped_time += seg_dur * (original_tempo / current_tempo) current_time += seg_dur if current_time >= original_pos_sec: return mapped_time # Ramp segment ramp_dur = end - start a = current_tempo b = (end_tempo - current_tempo) / ramp_dur u = min(original_pos_sec, end) - start if u > 0: if abs(b) < 1e-9: mapped_time += u * (original_tempo / a) else: mapped_time += original_tempo * (math.log(a + b * u) - math.log(a)) / b current_time += u if current_time >= original_pos_sec: return mapped_time current_tempo = end_tempo current_time = end # Final constant segment seg_dur = max(0, original_pos_sec - current_time) if seg_dur > 0: mapped_time += seg_dur * (original_tempo / current_tempo) return mapped_time def process_song(config, index, tempo_cache, settings, reference_tempo=None, sample_rate=None): log_messages = [] song_path = config["song_path"] start_offset = parse_time(config.get("start_offset", "0:00")) no_tempo_adjust = config.get("no_tempo_adjust", False) log_messages.append(f"Processing song {index+1}: {song_path}") try: audio_segment = AudioSegment.from_file(song_path) if start_offset > 0: audio_segment = audio_segment[start_offset:] # 1. Convert Audio to Arrays for Analysis/Processing EARLY (Needed for Loop Snapping) full_duration = len(audio_segment) y_raw = np.array(audio_segment.get_array_of_samples()).astype(np.float32) y_raw /= (1 << (8 * audio_segment.sample_width - 1)) if audio_segment.channels == 2: y_stereo = y_raw.reshape((-1, 2)) y_mono = librosa.to_mono(y_stereo.T) else: y_mono = y_raw if audio_segment.channels == 1 else librosa.to_mono(y_raw.reshape((-1, audio_segment.channels)).T) y_stereo = np.column_stack((y_mono, y_mono)) current_sample_rate = audio_segment.frame_rate if sample_rate is None: sample_rate = current_sample_rate # 2. Detect Tempo EARLY tempo = get_reliable_tempo(y_mono, sample_rate, song_path, tempo_cache) # --- Apply Loops (With BPM Grid Snapping) --- if "loops" in config: # We need to reconstruct audio_segment because loops change the length # We'll do this by slicing the original audio_segment based on calculated times new_audio = AudioSegment.empty() # Sort loops by time to handle them sequentially loops = sorted(config["loops"], key=lambda x: parse_time(x.get("start_time", "0:00"))) current_pos = 0 for loop in loops: loop_start_ms = parse_time(loop.get("start_time", "0:00")) loop_end_ms = parse_time(loop.get("end_time")) count = int(loop.get("count", 1)) if loop_start_ms >= loop_end_ms: continue # Add audio BEFORE the loop if loop_start_ms > current_pos: new_audio += audio_segment[current_pos:loop_start_ms] # --- GRID SNAPPING LOGIC --- # Calculate exact beat duration at this song's tempo beat_len_ms = (60.0 / tempo) * 1000.0 # How long is the user's manual loop? user_loop_duration_ms = loop_end_ms - loop_start_ms # How many beats is that likely to be? (Round to nearest whole beat) num_beats = round(user_loop_duration_ms / beat_len_ms) if num_beats == 0: num_beats = 1 # Prevent division by zero for tiny loops # What is the PERFECT grid length for that many beats? perfect_duration_ms = num_beats * beat_len_ms log_messages.append(f"Loop Correction: {user_loop_duration_ms}ms -> {perfect_duration_ms:.2f}ms ({num_beats} beats @ {tempo} BPM)") # Extract the loop audio loop_segment = audio_segment[loop_start_ms:loop_end_ms] # Time-Stretch the loop segment to match perfect_duration_ms exactly # This ensures 28 loops don't drift by even 1 millisecond. # Convert loop to numpy for rubberband l_raw = np.array(loop_segment.get_array_of_samples()).astype(np.float32) l_raw /= (1 << (8 * loop_segment.sample_width - 1)) if loop_segment.channels == 2: l_stereo = l_raw.reshape((-1, 2)) else: l_mono = l_raw l_stereo = np.column_stack((l_mono, l_mono)) # Stretch Factor # If user cut is 3.93s and perfect is 3.931s, we stretch by ~1.0002 (imperceptible pitch change) stretch_ratio = perfect_duration_ms / user_loop_duration_ms # In rubberband: ratio > 1.0 is shorter/faster. We want longer duration. # duration_new = duration_old * stretch_rate ?? No. # pyrubberband time_stretch(y, sr, rate). rate=2.0 makes it half duration. # We want duration to go from User -> Perfect. # rate = User / Perfect. rb_rate = user_loop_duration_ms / perfect_duration_ms l_stretched_data = pyrubberband.time_stretch(l_stereo, sample_rate, rb_rate) # Convert back to AudioSegment # Ensure 16-bit PCM for Pydub compatibility l_stretched_data_int = (l_stretched_data * (2**15 - 1)).astype(np.int16) perfect_loop_segment = AudioSegment( l_stretched_data_int.tobytes(), frame_rate=sample_rate, sample_width=2, channels=2 ) # Append the snapped loop X times new_audio += (perfect_loop_segment * count) current_pos = loop_end_ms # Add remaining audio after the last loop if current_pos < len(audio_segment): new_audio += audio_segment[current_pos:] audio_segment = new_audio # --- Apply EQ Filters --- if "eq_filters" in config: for eq_filter in config["eq_filters"]: filter_type = eq_filter.get("type") cutoff_hz = eq_filter.get("cutoff_hz") start_ms = parse_time(eq_filter.get("start_time", "0:00")) end_time_str = eq_filter.get("end_time") end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment) if not filter_type or not cutoff_hz or start_ms >= end_ms: continue pre_segment = audio_segment[:start_ms] segment_to_filter = audio_segment[start_ms:end_ms] post_segment = audio_segment[end_ms:] if filter_type == "low_pass": filtered_slice = segment_to_filter.low_pass_filter(cutoff_hz) elif filter_type == "high_pass": filtered_slice = segment_to_filter.high_pass_filter(cutoff_hz) elif filter_type == "band_pass": low, high = eq_filter.get("low_cutoff_hz"), eq_filter.get("high_cutoff_hz") filtered_slice = segment_to_filter.high_pass_filter(low).low_pass_filter(high) if low and high else segment_to_filter elif filter_type == "band_reject": low, high = eq_filter.get("low_cutoff_hz"), eq_filter.get("high_cutoff_hz") filtered_slice = segment_to_filter.low_pass_filter(low).overlay(segment_to_filter.high_pass_filter(high)) if low and high else segment_to_filter else: filtered_slice = segment_to_filter audio_segment = pre_segment + filtered_slice + post_segment # --- Apply Volume Automation --- if "volume_automation" in config: for automation in config["volume_automation"]: start_ms = parse_time(automation.get("start_time", "0:00")) end_time_str = automation.get("end_time") end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment) gain_db = float(automation.get("gain_db", 0)) if start_ms >= end_ms: continue pre_segment = audio_segment[:start_ms] segment_to_automate = audio_segment[start_ms:end_ms] post_segment = audio_segment[end_ms:] audio_segment = pre_segment + segment_to_automate.apply_gain(gain_db) + post_segment # --- Apply Band Gains (With Ramping) --- if "band_gains" in config: for band_gain in config["band_gains"]: start_ms = parse_time(band_gain.get("start_time", "0:00")) end_time_str = band_gain.get("end_time") end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment) target_low_gain_db = float(band_gain.get("low_gain_db", 0)) target_mid_gain_db = float(band_gain.get("mid_gain_db", 0)) target_high_gain_db = float(band_gain.get("high_gain_db", 0)) if start_ms >= end_ms: continue if target_low_gain_db == 0 and target_mid_gain_db == 0 and target_high_gain_db == 0: continue pre_segment = audio_segment[:start_ms] segment_to_process = audio_segment[start_ms:end_ms] post_segment = audio_segment[end_ms:] ramp_duration_ms = len(segment_to_process) if ramp_duration_ms <= 0: continue chunk_duration_ms = 250 num_chunks = max(2, int(ramp_duration_ms / chunk_duration_ms)) chunk_duration_ms = ramp_duration_ms / num_chunks processed_chunks = [] for i in range(num_chunks): chunk_start_ms = int(i * chunk_duration_ms) chunk_end_ms = int((i + 1) * chunk_duration_ms) chunk = segment_to_process[chunk_start_ms:chunk_end_ms] if len(chunk) == 0: continue t = (i + 0.5) / num_chunks low_gain = t * target_low_gain_db mid_gain = t * target_mid_gain_db high_gain = t * target_high_gain_db filter_str = [] if low_gain != 0: filter_str.append(f"bass=g={low_gain}:f=250:w=0.707") if mid_gain != 0: filter_str.append(f"equalizer=f=1000:t=q:w=1:g={mid_gain}") if high_gain != 0: filter_str.append(f"treble=g={high_gain}:f=2500:w=0.707") if filter_str: processed_chunk = apply_ffmpeg_effect(chunk, ",".join(filter_str)) else: processed_chunk = chunk processed_chunks.append(processed_chunk) processed_segment = AudioSegment.empty() for chunk in processed_chunks: processed_segment += chunk filter_str_post = [] if target_low_gain_db != 0: filter_str_post.append(f"bass=g={target_low_gain_db}:f=250:w=0.707") if target_mid_gain_db != 0: filter_str_post.append(f"equalizer=f=1000:t=q:w=1:g={target_mid_gain_db}") if target_high_gain_db != 0: filter_str_post.append(f"treble=g={target_high_gain_db}:f=2500:w=0.707") if filter_str_post and len(post_segment) > 0: post_segment = apply_ffmpeg_effect(post_segment, ",".join(filter_str_post)) audio_segment = pre_segment + processed_segment + post_segment # --- Apply Effects --- if "effects" in config: for effect in config["effects"]: start_ms = parse_time(effect.get("start_time", "0:00")) end_time_str = effect.get("end_time") end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment) effect_type = effect.get("type") if start_ms >= end_ms: continue pre_segment = audio_segment[:start_ms] segment_to_process = audio_segment[start_ms:end_ms] post_segment = audio_segment[end_ms:] effect_str = "" if effect_type == "reverb": wet = effect.get("wet", 0.4) effect_str = f"afftfilt=real='hypot(re,im)*cos(random(0))*{wet}+re*(1-{wet})':imag='hypot(re,im)*sin(random(0))*{wet}+im*(1-{wet})'" elif effect_type == "delay": delay_ms = effect.get("delay_ms", 500) effect_str = f"adelay=delays={delay_ms}:all=1" elif effect_type == "raw_ffmpeg": effect_str = effect.get("filter_string", "") if effect_str: processed_slice = apply_ffmpeg_effect(segment_to_process, effect_str) audio_segment = pre_segment + processed_slice + post_segment # Apply pre-mix normalization pre_mix_target_dbfs = config.get("target_dBFS", settings.get("pre_mix_target_dbfs", -12.0)) if config.get("apply_pre_mix_normalization", True): audio_segment = normalize_audio(audio_segment, pre_mix_target_dbfs) full_duration = len(audio_segment) # Prepare Final Output for Stretch y_raw = np.array(audio_segment.get_array_of_samples()).astype(np.float32) y_raw /= (1 << (8 * audio_segment.sample_width - 1)) if audio_segment.channels == 2: y_stereo = y_raw.reshape((-1, 2)) y_mono = librosa.to_mono(y_stereo.T) else: y_mono = y_raw if audio_segment.channels == 1 else librosa.to_mono(y_raw.reshape((-1, audio_segment.channels)).T) y_stereo = np.column_stack((y_mono, y_mono)) # Tempo Detection (Already done earlier, reusing value) # Logic for Ramps and Adjustments normalized_ramps = get_normalized_ramps(config, start_offset) effective_ref = None if no_tempo_adjust else reference_tempo final_output_tempo = tempo stretched_audio = y_stereo if normalized_ramps: log_messages.append(f"Applying tempo ramps with initial ref {effective_ref if effective_ref else tempo}") stretched_audio = apply_tempo_ramps(stretched_audio, sample_rate, normalized_ramps, tempo, effective_ref) final_output_tempo = float(normalized_ramps[-1]["end_tempo"]) elif index > 0 and not no_tempo_adjust: stretch_rate = reference_tempo / tempo stretched_audio = pyrubberband.time_stretch(y_stereo, sample_rate, stretch_rate) final_output_tempo = reference_tempo else: final_output_tempo = tempo temp_file = f"temp_song{index}.flac" sf.write(temp_file, stretched_audio, sample_rate) return temp_file, sample_rate, tempo, full_duration, len(AudioSegment.from_file(temp_file)), log_messages, final_output_tempo, normalized_ramps except Exception as e: logging.error(f"Error processing {song_path}: {e}") return None, None, None, None, None, [f"ERROR: Failed to process {song_path}"], None, [] def mix_songs(playlist_data): settings = playlist_data["settings"] song_configs = playlist_data["tracks"] output_file = settings.get("output_filename", "mix_output.flac") default_crossfade = settings.get("default_crossfade_duration", "0:10") final_mix_target_dbfs = settings.get("final_mix_target_dbfs", -1.0) num_cores = settings.get("num_cores", 0) logger = logging.getLogger(__name__) manager = Manager() tempo_cache = manager.dict(load_tempo_cache()) # --- Processing Phase --- first_result = process_song(song_configs[0], 0, tempo_cache, settings, None) if first_result[0] is None: sys.exit(1) current_chain_tempo = first_result[6] sample_rate = first_result[1] processed_configs = [first_result] for i, config in enumerate(song_configs[1:], 1): no_adjust = config.get("no_tempo_adjust", False) ref_tempo = current_chain_tempo if not no_adjust else None result = process_song(config, i, tempo_cache, settings, ref_tempo, sample_rate) processed_configs.append(result) if result[6] is not None: current_chain_tempo = result[6] # --- Mixing Phase --- temp_files = [res[0] for res in processed_configs] if any(t is None for t in temp_files): sys.exit(1) current_mix = AudioSegment.from_file(temp_files[0]) chapters = [(0, os.path.basename(song_configs[0]['song_path']))] song_start_times = [0] for index, config in enumerate(tqdm(song_configs[1:], desc="Mixing songs", unit="song"), start=1): prev_song_config = song_configs[index-1] prev_result = processed_configs[index-1] prev_song_orig_tempo = prev_result[2] prev_song_ramps = prev_result[7] # Normalized ramps if index == 1: prev_effective_start_tempo = None else: p_prev_res = processed_configs[index-2] p_prev_no_adj = prev_song_config.get("no_tempo_adjust", False) if p_prev_no_adj: prev_effective_start_tempo = None else: prev_effective_start_tempo = p_prev_res[6] crossfade_start_str = config.get("crossfade_start") crossfade_duration = parse_time(config.get("crossfade_duration", default_crossfade)) prev_song_start_offset = parse_time(prev_song_config.get("start_offset", "0:00")) relative_crossfade_start = parse_time(crossfade_start_str) original_pos_in_trimmed = max(0, relative_crossfade_start - prev_song_start_offset) / 1000.0 adj_pos_sec = compute_mapped_time( original_pos_in_trimmed, prev_song_ramps, prev_song_orig_tempo, prev_effective_start_tempo, prev_result[3] / 1000.0 ) adj_crossfade_in_segment = int(adj_pos_sec * 1000) next_song_start_time = song_start_times[-1] + adj_crossfade_in_segment next_song_start_time = max(0, next_song_start_time) next_song = AudioSegment.from_file(temp_files[index]) fade_in_duration = parse_time(config.get("fade_in_duration")) if config.get("fade_in_duration") else crossfade_duration fade_out_power = float(prev_song_config.get("fade_out_power", prev_song_config.get("fade_power", 1.0))) fade_in_power = float(config.get("fade_in_power", config.get("fade_power", 1.0))) fade_out_delay = parse_time(prev_song_config.get("fade_out_delay", "0:00")) track1_before_fade = current_mix[:next_song_start_time] track1_total_overlap = current_mix[next_song_start_time : next_song_start_time + crossfade_duration] actual_overlap_duration = len(track1_total_overlap) track1_constant_part = track1_total_overlap[:fade_out_delay] track1_fade_part = track1_total_overlap[fade_out_delay:] faded_out_segment = apply_custom_fade(track1_fade_part, 'out', len(track1_fade_part), fade_out_power) faded_out_part = track1_constant_part + faded_out_segment track2_overlay_section = next_song[:actual_overlap_duration] actual_fade_in_duration = min(fade_in_duration, actual_overlap_duration) track2_fade_in_part = track2_overlay_section[:actual_fade_in_duration] track2_after_fade_in = track2_overlay_section[actual_fade_in_duration:] faded_in_part = apply_custom_fade(track2_fade_in_part, 'in', actual_fade_in_duration, fade_in_power) full_track2_overlay = faded_in_part + track2_after_fade_in crossfade_result = faded_out_part.overlay(full_track2_overlay) if crossfade_result.max_dBFS > -0.1: crossfade_result = normalize_audio(crossfade_result, -0.1) current_mix = track1_before_fade + crossfade_result + next_song[actual_overlap_duration:] song_start_times.append(next_song_start_time) chapters.append((next_song_start_time, os.path.basename(config['song_path']))) # --- Export --- logger.info("Exporting raw mix...") temp_pcm = "temp_raw_mix.raw" current_mix.export(temp_pcm, format="raw") temp_raw_file = "temp_raw_mix.flac" bits = current_mix.sample_width * 8 pcm_format = f"s{bits}le" subprocess.run(["ffmpeg", "-f", pcm_format, "-ar", str(current_mix.frame_rate), "-ac", str(current_mix.channels), "-i", temp_pcm, "-c:a", "flac", "-y", temp_raw_file], check=True, capture_output=True) logger.info("Normalizing...") subprocess.run(["ffmpeg", "-i", temp_raw_file, "-af", f"loudnorm=I=-14:LRA=7:TP={final_mix_target_dbfs}", "-y", output_file], check=True, capture_output=True) for f in [temp_pcm, temp_raw_file] + temp_files: if os.path.exists(f): os.remove(f) add_flac_chapters(output_file, chapters) save_tempo_cache(tempo_cache.copy()) return output_file, len(current_mix), current_chain_tempo if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--playlist", type=str, default="ambient_mix_settings.json") parser.add_argument("--debug", action="store_true") parser.add_argument("--test-last-two", action="store_true") parser.add_argument("--tracks", type=str) args = parser.parse_args() setup_logging(level=logging.DEBUG if args.debug else logging.INFO) logger = logging.getLogger(__name__) with open(args.playlist, 'r') as f: playlist_data = json.load(f) if args.tracks: try: track_numbers = [int(t.strip()) for t in args.tracks.split(',')] original_tracks = playlist_data['tracks'] playlist_data['tracks'] = [original_tracks[i-1] for i in track_numbers if 0 < i <= len(original_tracks)] except ValueError: logger.error("Invalid format for --tracks.") sys.exit(1) elif args.test_last_two: playlist_data['tracks'] = playlist_data['tracks'][-2:] try: output_file, final_duration, ref_tempo = mix_songs(playlist_data) print(f"\nSuccess! Output: {output_file}") except Exception as e: logging.error(f"Error: {e}", exc_info=True)