807 lines
34 KiB
Python
807 lines
34 KiB
Python
import librosa
|
|
import numpy as np
|
|
import soundfile as sf
|
|
from pydub import AudioSegment
|
|
import pyrubberband
|
|
import os
|
|
from tqdm import tqdm
|
|
import logging
|
|
import json
|
|
import argparse
|
|
import subprocess
|
|
from multiprocessing import Pool, cpu_count, Manager
|
|
import sys
|
|
import math
|
|
import tempfile
|
|
import copy
|
|
|
|
def apply_ffmpeg_effect(segment, effect_filter_str):
|
|
"""Applies an FFmpeg audio filter to an AudioSegment using unique temp files."""
|
|
if len(segment) <= 0:
|
|
return AudioSegment.empty()
|
|
|
|
if len(segment) < 10:
|
|
segment = segment + AudioSegment.silent(duration=10)
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_in:
|
|
tmp_in_path = tmp_in.name
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_out:
|
|
tmp_out_path = tmp_out.name
|
|
|
|
try:
|
|
segment.export(tmp_in_path, format="wav")
|
|
|
|
command = [
|
|
"ffmpeg",
|
|
"-i", tmp_in_path,
|
|
"-af", effect_filter_str,
|
|
"-y",
|
|
tmp_out_path
|
|
]
|
|
subprocess.run(command, check=True, capture_output=True, text=True)
|
|
processed_segment = AudioSegment.from_file(tmp_out_path)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
logging.error(f"FFmpeg effect failed! Filter: {effect_filter_str}")
|
|
logging.error(f"FFmpeg stderr:\n{e.stderr}")
|
|
processed_segment = segment
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error in apply_ffmpeg_effect: {e}")
|
|
processed_segment = segment
|
|
finally:
|
|
for path in (tmp_in_path, tmp_out_path):
|
|
try:
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
except:
|
|
pass
|
|
|
|
return processed_segment
|
|
|
|
def load_tempo_cache(cache_path='tempo_cache.json'):
|
|
if os.path.exists(cache_path):
|
|
with open(cache_path, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_tempo_cache(cache, cache_path='tempo_cache.json'):
|
|
with open(cache_path, 'w') as f:
|
|
json.dump(cache, f, indent=4)
|
|
|
|
def setup_logging(level=logging.INFO):
|
|
logging.basicConfig(level=level, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
|
|
def normalize_audio(audio_segment, target_dbfs=-1.0):
|
|
peak_dbfs = audio_segment.max_dBFS
|
|
change_in_dbfs = target_dbfs - peak_dbfs
|
|
return audio_segment.apply_gain(change_in_dbfs)
|
|
|
|
def apply_custom_fade(segment, fade_type, duration, fade_power):
|
|
if duration == 0 or len(segment) == 0 or fade_power == 0:
|
|
return segment
|
|
|
|
num_samples = int(segment.frame_count())
|
|
if num_samples == 0: return segment
|
|
|
|
t = np.linspace(0.0, 1.0, num_samples)
|
|
|
|
if fade_type == 'in':
|
|
gain_curve = t ** fade_power
|
|
elif fade_type == 'out':
|
|
gain_curve = (1.0 - t) ** fade_power
|
|
else:
|
|
return segment
|
|
|
|
samples = np.array(segment.get_array_of_samples()).astype(np.float64)
|
|
samples = samples.reshape((-1, segment.channels))
|
|
samples[:] *= gain_curve[:, np.newaxis]
|
|
faded_samples = samples.flatten().astype(segment.array_type)
|
|
return AudioSegment(data=faded_samples.tobytes(), sample_width=segment.sample_width, frame_rate=segment.frame_rate, channels=segment.channels)
|
|
|
|
def parse_time(tstr):
|
|
parts = str(tstr).split(':')
|
|
parts = [float(p) for p in parts]
|
|
if len(parts) == 3:
|
|
h, m, s = parts
|
|
elif len(parts) == 2:
|
|
h = 0
|
|
m, s = parts
|
|
else:
|
|
raise ValueError("Invalid time format")
|
|
return int((h*3600 + m*60 + s)*1000)
|
|
|
|
def add_flac_chapters(file_path, chapters):
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(f"Adding {len(chapters)} chapter markers to {file_path}")
|
|
|
|
for i, (start_time_ms, title) in enumerate(chapters):
|
|
ms = start_time_ms % 1000
|
|
secs = (start_time_ms // 1000) % 60
|
|
mins = (start_time_ms // (1000 * 60)) % 60
|
|
hours = (start_time_ms // (1000 * 3600))
|
|
start_time_str = f"{hours:02}:{mins:02}:{secs:02}.{ms:03}"
|
|
|
|
command = [
|
|
"metaflac",
|
|
f"--set-tag=CHAPTER{i+1:02}={start_time_str}",
|
|
f"--set-tag=CHAPTER{i+1:02}NAME={title}",
|
|
file_path
|
|
]
|
|
try:
|
|
subprocess.run(command, check=True, capture_output=True, text=True)
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
logger.error(f"Error running metaflac for chapter {i+1}.")
|
|
break
|
|
|
|
def get_reliable_tempo(audio, sample_rate, song_path, tempo_cache, default_tempo=120.0):
|
|
logger = logging.getLogger(__name__)
|
|
if song_path in tempo_cache:
|
|
logger.info(f"Using cached tempo for {song_path}: {tempo_cache[song_path]}")
|
|
return tempo_cache[song_path]
|
|
|
|
try:
|
|
tempo = librosa.feature.tempo(y=audio, sr=sample_rate)[0]
|
|
if not np.isfinite(tempo) or tempo <= 0:
|
|
tempo = default_tempo
|
|
tempo_cache[song_path] = float(tempo)
|
|
logger.info(f"Detected tempo for {song_path}: {tempo}")
|
|
return float(tempo)
|
|
except Exception as e:
|
|
logger.warning(f"Tempo detection failed for {song_path}: {e}. Using default tempo {default_tempo}")
|
|
return default_tempo
|
|
|
|
def get_normalized_ramps(config, start_offset_ms):
|
|
"""Extracts ramps from config and adjusts start/end times by subtracting the start_offset."""
|
|
raw_ramps = config.get("tempo_ramps", config.get("temp_ramps", []))
|
|
if not raw_ramps:
|
|
return []
|
|
|
|
normalized_ramps = []
|
|
for ramp in raw_ramps:
|
|
new_ramp = copy.deepcopy(ramp)
|
|
s_time = parse_time(ramp["start_time"])
|
|
e_time = parse_time(ramp["end_time"])
|
|
|
|
# Shift times to be relative to the trimmed audio start
|
|
adj_s_time = max(0, s_time - start_offset_ms)
|
|
adj_e_time = max(0, e_time - start_offset_ms)
|
|
|
|
new_ramp["start_time_ms"] = adj_s_time
|
|
new_ramp["end_time_ms"] = adj_e_time
|
|
|
|
if adj_e_time > adj_s_time:
|
|
normalized_ramps.append(new_ramp)
|
|
|
|
return sorted(normalized_ramps, key=lambda r: r["start_time_ms"])
|
|
|
|
def apply_tempo_ramps(y, sr, normalized_ramps, original_tempo, ref_tempo):
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if not normalized_ramps:
|
|
return y
|
|
|
|
# Create paths but don't keep them open
|
|
tmp_in = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
|
|
tmp_in_path = tmp_in.name
|
|
tmp_in.close()
|
|
|
|
tmp_out = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
|
|
tmp_out_path = tmp_out.name
|
|
tmp_out.close()
|
|
|
|
tmp_map = tempfile.NamedTemporaryFile(suffix='.txt', delete=False)
|
|
tmp_map_path = tmp_map.name
|
|
tmp_map.close()
|
|
|
|
try:
|
|
# 1. Write the audio to disk as standard 16-bit PCM WAV
|
|
sf.write(tmp_in_path, y, sr, subtype='PCM_16')
|
|
|
|
# 2. Generate the Time Map
|
|
map_points = []
|
|
map_points.append(f"0 0\n")
|
|
|
|
current_sample_in = 0.0
|
|
current_sample_out = 0.0
|
|
current_tempo = ref_tempo if ref_tempo else original_tempo
|
|
|
|
CHUNK_SIZE = 1024
|
|
total_samples = len(y)
|
|
|
|
ramps = sorted(normalized_ramps, key=lambda r: r["start_time_ms"])
|
|
|
|
while current_sample_in < total_samples:
|
|
time_ms = (current_sample_in / sr) * 1000.0
|
|
|
|
instant_tempo = current_tempo
|
|
|
|
# Find active ramp
|
|
for ramp in ramps:
|
|
if ramp["start_time_ms"] <= time_ms <= ramp["end_time_ms"]:
|
|
ramp_start_tempo = ref_tempo if ref_tempo else original_tempo
|
|
for prev_ramp in ramps:
|
|
if prev_ramp["end_time_ms"] <= ramp["start_time_ms"]:
|
|
ramp_start_tempo = float(prev_ramp["end_tempo"])
|
|
|
|
dur = ramp["end_time_ms"] - ramp["start_time_ms"]
|
|
if dur > 0:
|
|
prog = (time_ms - ramp["start_time_ms"]) / dur
|
|
instant_tempo = ramp_start_tempo + (float(ramp["end_tempo"]) - ramp_start_tempo) * prog
|
|
break
|
|
elif time_ms > ramp["end_time_ms"]:
|
|
instant_tempo = float(ramp["end_tempo"])
|
|
|
|
# Calculate stretch ratio
|
|
ratio = instant_tempo / original_tempo
|
|
|
|
# Advance
|
|
samples_added = CHUNK_SIZE / ratio
|
|
current_sample_in += CHUNK_SIZE
|
|
current_sample_out += samples_added
|
|
|
|
# Only write points that are within the file bounds
|
|
if current_sample_in < total_samples:
|
|
map_points.append(f"{int(current_sample_in)} {int(current_sample_out)}\n")
|
|
|
|
# Add the final exact end point to the map to ensure completeness
|
|
map_points.append(f"{total_samples} {int(current_sample_out)}\n")
|
|
|
|
# Write map file
|
|
with open(tmp_map_path, 'w') as f:
|
|
f.writelines(map_points)
|
|
|
|
# Calculate total expected duration in seconds for the -D flag
|
|
total_duration_sec = current_sample_out / sr
|
|
|
|
# 3. Run Rubberband CLI
|
|
# We use -D to specify the duration (REQUIRED by your version when using --timemap)
|
|
# We removed --threading and --precise as they are not supported by your version
|
|
# We use -q for quiet mode
|
|
command = [
|
|
"rubberband",
|
|
"--quiet",
|
|
"--timemap", tmp_map_path,
|
|
"-D", str(total_duration_sec),
|
|
tmp_in_path,
|
|
tmp_out_path
|
|
]
|
|
|
|
subprocess.run(command, check=True, capture_output=True, text=True)
|
|
|
|
# 4. Load result
|
|
y_processed, _ = sf.read(tmp_out_path, dtype='float32')
|
|
|
|
# Fix dimensions if mono/stereo mismatch occurs
|
|
if y.ndim == 2 and y_processed.ndim == 1:
|
|
y_processed = np.column_stack((y_processed, y_processed))
|
|
elif y.ndim == 1 and y_processed.ndim == 2:
|
|
y_processed = librosa.to_mono(y_processed.T)
|
|
|
|
return y_processed
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error("Rubberband CLI failed.")
|
|
logger.error(f"Stderr: {e.stderr}")
|
|
return y
|
|
except Exception as e:
|
|
logger.error(f"Map generation failed: {e}")
|
|
return y
|
|
finally:
|
|
for p in [tmp_in_path, tmp_out_path, tmp_map_path]:
|
|
if os.path.exists(p):
|
|
try:
|
|
os.remove(p)
|
|
except:
|
|
pass
|
|
|
|
def compute_mapped_time(original_pos_sec, normalized_ramps, original_tempo, ref_tempo, total_original_duration_sec):
|
|
"""Computes time in stretched audio using normalized (0-based) ramps."""
|
|
if not normalized_ramps and ref_tempo is None:
|
|
return original_pos_sec
|
|
if not normalized_ramps and ref_tempo is not None:
|
|
return original_pos_sec * (original_tempo / ref_tempo)
|
|
|
|
current_time = 0.0
|
|
mapped_time = 0.0
|
|
current_tempo = ref_tempo if ref_tempo else original_tempo
|
|
|
|
for ramp in normalized_ramps:
|
|
start = ramp["start_time_ms"] / 1000.0
|
|
end = ramp["end_time_ms"] / 1000.0
|
|
end_tempo = float(ramp["end_tempo"])
|
|
|
|
if start < current_time: continue
|
|
|
|
# Constant segment before ramp
|
|
if start > current_time:
|
|
seg_dur = min(start, original_pos_sec) - current_time
|
|
if seg_dur > 0:
|
|
mapped_time += seg_dur * (original_tempo / current_tempo)
|
|
current_time += seg_dur
|
|
if current_time >= original_pos_sec: return mapped_time
|
|
|
|
# Ramp segment
|
|
ramp_dur = end - start
|
|
a = current_tempo
|
|
b = (end_tempo - current_tempo) / ramp_dur
|
|
u = min(original_pos_sec, end) - start
|
|
|
|
if u > 0:
|
|
if abs(b) < 1e-9:
|
|
mapped_time += u * (original_tempo / a)
|
|
else:
|
|
mapped_time += original_tempo * (math.log(a + b * u) - math.log(a)) / b
|
|
current_time += u
|
|
if current_time >= original_pos_sec: return mapped_time
|
|
|
|
current_tempo = end_tempo
|
|
current_time = end
|
|
|
|
# Final constant segment
|
|
seg_dur = max(0, original_pos_sec - current_time)
|
|
if seg_dur > 0:
|
|
mapped_time += seg_dur * (original_tempo / current_tempo)
|
|
|
|
return mapped_time
|
|
|
|
def process_song(config, index, tempo_cache, settings, reference_tempo=None, sample_rate=None):
|
|
log_messages = []
|
|
song_path = config["song_path"]
|
|
start_offset = parse_time(config.get("start_offset", "0:00"))
|
|
no_tempo_adjust = config.get("no_tempo_adjust", False)
|
|
|
|
log_messages.append(f"Processing song {index+1}: {song_path}")
|
|
|
|
try:
|
|
audio_segment = AudioSegment.from_file(song_path)
|
|
|
|
if start_offset > 0:
|
|
audio_segment = audio_segment[start_offset:]
|
|
|
|
# 1. Convert Audio to Arrays for Analysis/Processing EARLY (Needed for Loop Snapping)
|
|
full_duration = len(audio_segment)
|
|
y_raw = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
|
|
y_raw /= (1 << (8 * audio_segment.sample_width - 1))
|
|
|
|
if audio_segment.channels == 2:
|
|
y_stereo = y_raw.reshape((-1, 2))
|
|
y_mono = librosa.to_mono(y_stereo.T)
|
|
else:
|
|
y_mono = y_raw if audio_segment.channels == 1 else librosa.to_mono(y_raw.reshape((-1, audio_segment.channels)).T)
|
|
y_stereo = np.column_stack((y_mono, y_mono))
|
|
|
|
current_sample_rate = audio_segment.frame_rate
|
|
if sample_rate is None:
|
|
sample_rate = current_sample_rate
|
|
|
|
# 2. Detect Tempo EARLY
|
|
tempo = get_reliable_tempo(y_mono, sample_rate, song_path, tempo_cache)
|
|
|
|
# --- Apply Loops (With BPM Grid Snapping) ---
|
|
if "loops" in config:
|
|
# We need to reconstruct audio_segment because loops change the length
|
|
# We'll do this by slicing the original audio_segment based on calculated times
|
|
new_audio = AudioSegment.empty()
|
|
|
|
# Sort loops by time to handle them sequentially
|
|
loops = sorted(config["loops"], key=lambda x: parse_time(x.get("start_time", "0:00")))
|
|
|
|
current_pos = 0
|
|
|
|
for loop in loops:
|
|
loop_start_ms = parse_time(loop.get("start_time", "0:00"))
|
|
loop_end_ms = parse_time(loop.get("end_time"))
|
|
count = int(loop.get("count", 1))
|
|
|
|
if loop_start_ms >= loop_end_ms: continue
|
|
|
|
# Add audio BEFORE the loop
|
|
if loop_start_ms > current_pos:
|
|
new_audio += audio_segment[current_pos:loop_start_ms]
|
|
|
|
# --- GRID SNAPPING LOGIC ---
|
|
# Calculate exact beat duration at this song's tempo
|
|
beat_len_ms = (60.0 / tempo) * 1000.0
|
|
|
|
# How long is the user's manual loop?
|
|
user_loop_duration_ms = loop_end_ms - loop_start_ms
|
|
|
|
# How many beats is that likely to be? (Round to nearest whole beat)
|
|
num_beats = round(user_loop_duration_ms / beat_len_ms)
|
|
if num_beats == 0: num_beats = 1 # Prevent division by zero for tiny loops
|
|
|
|
# What is the PERFECT grid length for that many beats?
|
|
perfect_duration_ms = num_beats * beat_len_ms
|
|
|
|
log_messages.append(f"Loop Correction: {user_loop_duration_ms}ms -> {perfect_duration_ms:.2f}ms ({num_beats} beats @ {tempo} BPM)")
|
|
|
|
# Extract the loop audio
|
|
loop_segment = audio_segment[loop_start_ms:loop_end_ms]
|
|
|
|
# Time-Stretch the loop segment to match perfect_duration_ms exactly
|
|
# This ensures 28 loops don't drift by even 1 millisecond.
|
|
|
|
# Convert loop to numpy for rubberband
|
|
l_raw = np.array(loop_segment.get_array_of_samples()).astype(np.float32)
|
|
l_raw /= (1 << (8 * loop_segment.sample_width - 1))
|
|
if loop_segment.channels == 2:
|
|
l_stereo = l_raw.reshape((-1, 2))
|
|
else:
|
|
l_mono = l_raw
|
|
l_stereo = np.column_stack((l_mono, l_mono))
|
|
|
|
# Stretch Factor
|
|
# If user cut is 3.93s and perfect is 3.931s, we stretch by ~1.0002 (imperceptible pitch change)
|
|
stretch_ratio = perfect_duration_ms / user_loop_duration_ms
|
|
|
|
# In rubberband: ratio > 1.0 is shorter/faster. We want longer duration.
|
|
# duration_new = duration_old * stretch_rate ?? No.
|
|
# pyrubberband time_stretch(y, sr, rate). rate=2.0 makes it half duration.
|
|
# We want duration to go from User -> Perfect.
|
|
# rate = User / Perfect.
|
|
rb_rate = user_loop_duration_ms / perfect_duration_ms
|
|
|
|
l_stretched_data = pyrubberband.time_stretch(l_stereo, sample_rate, rb_rate)
|
|
|
|
# Convert back to AudioSegment
|
|
# Ensure 16-bit PCM for Pydub compatibility
|
|
l_stretched_data_int = (l_stretched_data * (2**15 - 1)).astype(np.int16)
|
|
perfect_loop_segment = AudioSegment(
|
|
l_stretched_data_int.tobytes(),
|
|
frame_rate=sample_rate,
|
|
sample_width=2,
|
|
channels=2
|
|
)
|
|
|
|
# Append the snapped loop X times
|
|
new_audio += (perfect_loop_segment * count)
|
|
|
|
current_pos = loop_end_ms
|
|
|
|
# Add remaining audio after the last loop
|
|
if current_pos < len(audio_segment):
|
|
new_audio += audio_segment[current_pos:]
|
|
|
|
audio_segment = new_audio
|
|
|
|
# --- Apply EQ Filters ---
|
|
if "eq_filters" in config:
|
|
for eq_filter in config["eq_filters"]:
|
|
filter_type = eq_filter.get("type")
|
|
cutoff_hz = eq_filter.get("cutoff_hz")
|
|
start_ms = parse_time(eq_filter.get("start_time", "0:00"))
|
|
end_time_str = eq_filter.get("end_time")
|
|
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
|
|
|
|
if not filter_type or not cutoff_hz or start_ms >= end_ms: continue
|
|
|
|
pre_segment = audio_segment[:start_ms]
|
|
segment_to_filter = audio_segment[start_ms:end_ms]
|
|
post_segment = audio_segment[end_ms:]
|
|
|
|
if filter_type == "low_pass":
|
|
filtered_slice = segment_to_filter.low_pass_filter(cutoff_hz)
|
|
elif filter_type == "high_pass":
|
|
filtered_slice = segment_to_filter.high_pass_filter(cutoff_hz)
|
|
elif filter_type == "band_pass":
|
|
low, high = eq_filter.get("low_cutoff_hz"), eq_filter.get("high_cutoff_hz")
|
|
filtered_slice = segment_to_filter.high_pass_filter(low).low_pass_filter(high) if low and high else segment_to_filter
|
|
elif filter_type == "band_reject":
|
|
low, high = eq_filter.get("low_cutoff_hz"), eq_filter.get("high_cutoff_hz")
|
|
filtered_slice = segment_to_filter.low_pass_filter(low).overlay(segment_to_filter.high_pass_filter(high)) if low and high else segment_to_filter
|
|
else:
|
|
filtered_slice = segment_to_filter
|
|
|
|
audio_segment = pre_segment + filtered_slice + post_segment
|
|
|
|
# --- Apply Volume Automation ---
|
|
if "volume_automation" in config:
|
|
for automation in config["volume_automation"]:
|
|
start_ms = parse_time(automation.get("start_time", "0:00"))
|
|
end_time_str = automation.get("end_time")
|
|
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
|
|
gain_db = float(automation.get("gain_db", 0))
|
|
|
|
if start_ms >= end_ms: continue
|
|
|
|
pre_segment = audio_segment[:start_ms]
|
|
segment_to_automate = audio_segment[start_ms:end_ms]
|
|
post_segment = audio_segment[end_ms:]
|
|
audio_segment = pre_segment + segment_to_automate.apply_gain(gain_db) + post_segment
|
|
|
|
# --- Apply Band Gains (With Ramping) ---
|
|
if "band_gains" in config:
|
|
for band_gain in config["band_gains"]:
|
|
start_ms = parse_time(band_gain.get("start_time", "0:00"))
|
|
end_time_str = band_gain.get("end_time")
|
|
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
|
|
target_low_gain_db = float(band_gain.get("low_gain_db", 0))
|
|
target_mid_gain_db = float(band_gain.get("mid_gain_db", 0))
|
|
target_high_gain_db = float(band_gain.get("high_gain_db", 0))
|
|
|
|
if start_ms >= end_ms: continue
|
|
if target_low_gain_db == 0 and target_mid_gain_db == 0 and target_high_gain_db == 0:
|
|
continue
|
|
|
|
pre_segment = audio_segment[:start_ms]
|
|
segment_to_process = audio_segment[start_ms:end_ms]
|
|
post_segment = audio_segment[end_ms:]
|
|
|
|
ramp_duration_ms = len(segment_to_process)
|
|
if ramp_duration_ms <= 0: continue
|
|
|
|
chunk_duration_ms = 250
|
|
num_chunks = max(2, int(ramp_duration_ms / chunk_duration_ms))
|
|
chunk_duration_ms = ramp_duration_ms / num_chunks
|
|
|
|
processed_chunks = []
|
|
for i in range(num_chunks):
|
|
chunk_start_ms = int(i * chunk_duration_ms)
|
|
chunk_end_ms = int((i + 1) * chunk_duration_ms)
|
|
chunk = segment_to_process[chunk_start_ms:chunk_end_ms]
|
|
|
|
if len(chunk) == 0: continue
|
|
|
|
t = (i + 0.5) / num_chunks
|
|
low_gain = t * target_low_gain_db
|
|
mid_gain = t * target_mid_gain_db
|
|
high_gain = t * target_high_gain_db
|
|
|
|
filter_str = []
|
|
if low_gain != 0: filter_str.append(f"bass=g={low_gain}:f=250:w=0.707")
|
|
if mid_gain != 0: filter_str.append(f"equalizer=f=1000:t=q:w=1:g={mid_gain}")
|
|
if high_gain != 0: filter_str.append(f"treble=g={high_gain}:f=2500:w=0.707")
|
|
|
|
if filter_str:
|
|
processed_chunk = apply_ffmpeg_effect(chunk, ",".join(filter_str))
|
|
else:
|
|
processed_chunk = chunk
|
|
processed_chunks.append(processed_chunk)
|
|
|
|
processed_segment = AudioSegment.empty()
|
|
for chunk in processed_chunks:
|
|
processed_segment += chunk
|
|
|
|
filter_str_post = []
|
|
if target_low_gain_db != 0: filter_str_post.append(f"bass=g={target_low_gain_db}:f=250:w=0.707")
|
|
if target_mid_gain_db != 0: filter_str_post.append(f"equalizer=f=1000:t=q:w=1:g={target_mid_gain_db}")
|
|
if target_high_gain_db != 0: filter_str_post.append(f"treble=g={target_high_gain_db}:f=2500:w=0.707")
|
|
|
|
if filter_str_post and len(post_segment) > 0:
|
|
post_segment = apply_ffmpeg_effect(post_segment, ",".join(filter_str_post))
|
|
|
|
audio_segment = pre_segment + processed_segment + post_segment
|
|
|
|
# --- Apply Effects ---
|
|
if "effects" in config:
|
|
for effect in config["effects"]:
|
|
start_ms = parse_time(effect.get("start_time", "0:00"))
|
|
end_time_str = effect.get("end_time")
|
|
end_ms = parse_time(end_time_str) if end_time_str else len(audio_segment)
|
|
effect_type = effect.get("type")
|
|
|
|
if start_ms >= end_ms: continue
|
|
|
|
pre_segment = audio_segment[:start_ms]
|
|
segment_to_process = audio_segment[start_ms:end_ms]
|
|
post_segment = audio_segment[end_ms:]
|
|
|
|
effect_str = ""
|
|
if effect_type == "reverb":
|
|
wet = effect.get("wet", 0.4)
|
|
effect_str = f"afftfilt=real='hypot(re,im)*cos(random(0))*{wet}+re*(1-{wet})':imag='hypot(re,im)*sin(random(0))*{wet}+im*(1-{wet})'"
|
|
elif effect_type == "delay":
|
|
delay_ms = effect.get("delay_ms", 500)
|
|
effect_str = f"adelay=delays={delay_ms}:all=1"
|
|
elif effect_type == "raw_ffmpeg":
|
|
effect_str = effect.get("filter_string", "")
|
|
|
|
if effect_str:
|
|
processed_slice = apply_ffmpeg_effect(segment_to_process, effect_str)
|
|
audio_segment = pre_segment + processed_slice + post_segment
|
|
|
|
# Apply pre-mix normalization
|
|
pre_mix_target_dbfs = config.get("target_dBFS", settings.get("pre_mix_target_dbfs", -12.0))
|
|
if config.get("apply_pre_mix_normalization", True):
|
|
audio_segment = normalize_audio(audio_segment, pre_mix_target_dbfs)
|
|
|
|
full_duration = len(audio_segment)
|
|
|
|
# Prepare Final Output for Stretch
|
|
y_raw = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
|
|
y_raw /= (1 << (8 * audio_segment.sample_width - 1))
|
|
|
|
if audio_segment.channels == 2:
|
|
y_stereo = y_raw.reshape((-1, 2))
|
|
y_mono = librosa.to_mono(y_stereo.T)
|
|
else:
|
|
y_mono = y_raw if audio_segment.channels == 1 else librosa.to_mono(y_raw.reshape((-1, audio_segment.channels)).T)
|
|
y_stereo = np.column_stack((y_mono, y_mono))
|
|
|
|
# Tempo Detection (Already done earlier, reusing value)
|
|
|
|
# Logic for Ramps and Adjustments
|
|
normalized_ramps = get_normalized_ramps(config, start_offset)
|
|
|
|
effective_ref = None if no_tempo_adjust else reference_tempo
|
|
final_output_tempo = tempo
|
|
|
|
stretched_audio = y_stereo
|
|
|
|
if normalized_ramps:
|
|
log_messages.append(f"Applying tempo ramps with initial ref {effective_ref if effective_ref else tempo}")
|
|
stretched_audio = apply_tempo_ramps(stretched_audio, sample_rate, normalized_ramps, tempo, effective_ref)
|
|
final_output_tempo = float(normalized_ramps[-1]["end_tempo"])
|
|
elif index > 0 and not no_tempo_adjust:
|
|
stretch_rate = reference_tempo / tempo
|
|
stretched_audio = pyrubberband.time_stretch(y_stereo, sample_rate, stretch_rate)
|
|
final_output_tempo = reference_tempo
|
|
else:
|
|
final_output_tempo = tempo
|
|
|
|
temp_file = f"temp_song{index}.flac"
|
|
sf.write(temp_file, stretched_audio, sample_rate)
|
|
|
|
return temp_file, sample_rate, tempo, full_duration, len(AudioSegment.from_file(temp_file)), log_messages, final_output_tempo, normalized_ramps
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing {song_path}: {e}")
|
|
return None, None, None, None, None, [f"ERROR: Failed to process {song_path}"], None, []
|
|
|
|
def mix_songs(playlist_data):
|
|
settings = playlist_data["settings"]
|
|
song_configs = playlist_data["tracks"]
|
|
output_file = settings.get("output_filename", "mix_output.flac")
|
|
default_crossfade = settings.get("default_crossfade_duration", "0:10")
|
|
final_mix_target_dbfs = settings.get("final_mix_target_dbfs", -1.0)
|
|
num_cores = settings.get("num_cores", 0)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
manager = Manager()
|
|
tempo_cache = manager.dict(load_tempo_cache())
|
|
|
|
# --- Processing Phase ---
|
|
first_result = process_song(song_configs[0], 0, tempo_cache, settings, None)
|
|
if first_result[0] is None: sys.exit(1)
|
|
|
|
current_chain_tempo = first_result[6]
|
|
sample_rate = first_result[1]
|
|
|
|
processed_configs = [first_result]
|
|
|
|
for i, config in enumerate(song_configs[1:], 1):
|
|
no_adjust = config.get("no_tempo_adjust", False)
|
|
ref_tempo = current_chain_tempo if not no_adjust else None
|
|
|
|
result = process_song(config, i, tempo_cache, settings, ref_tempo, sample_rate)
|
|
processed_configs.append(result)
|
|
|
|
if result[6] is not None:
|
|
current_chain_tempo = result[6]
|
|
|
|
# --- Mixing Phase ---
|
|
temp_files = [res[0] for res in processed_configs]
|
|
if any(t is None for t in temp_files): sys.exit(1)
|
|
|
|
current_mix = AudioSegment.from_file(temp_files[0])
|
|
chapters = [(0, os.path.basename(song_configs[0]['song_path']))]
|
|
song_start_times = [0]
|
|
|
|
for index, config in enumerate(tqdm(song_configs[1:], desc="Mixing songs", unit="song"), start=1):
|
|
prev_song_config = song_configs[index-1]
|
|
prev_result = processed_configs[index-1]
|
|
|
|
prev_song_orig_tempo = prev_result[2]
|
|
prev_song_ramps = prev_result[7] # Normalized ramps
|
|
|
|
if index == 1:
|
|
prev_effective_start_tempo = None
|
|
else:
|
|
p_prev_res = processed_configs[index-2]
|
|
p_prev_no_adj = prev_song_config.get("no_tempo_adjust", False)
|
|
if p_prev_no_adj:
|
|
prev_effective_start_tempo = None
|
|
else:
|
|
prev_effective_start_tempo = p_prev_res[6]
|
|
|
|
crossfade_start_str = config.get("crossfade_start")
|
|
crossfade_duration = parse_time(config.get("crossfade_duration", default_crossfade))
|
|
prev_song_start_offset = parse_time(prev_song_config.get("start_offset", "0:00"))
|
|
|
|
relative_crossfade_start = parse_time(crossfade_start_str)
|
|
|
|
original_pos_in_trimmed = max(0, relative_crossfade_start - prev_song_start_offset) / 1000.0
|
|
|
|
adj_pos_sec = compute_mapped_time(
|
|
original_pos_in_trimmed,
|
|
prev_song_ramps,
|
|
prev_song_orig_tempo,
|
|
prev_effective_start_tempo,
|
|
prev_result[3] / 1000.0
|
|
)
|
|
|
|
adj_crossfade_in_segment = int(adj_pos_sec * 1000)
|
|
next_song_start_time = song_start_times[-1] + adj_crossfade_in_segment
|
|
next_song_start_time = max(0, next_song_start_time)
|
|
|
|
next_song = AudioSegment.from_file(temp_files[index])
|
|
|
|
fade_in_duration = parse_time(config.get("fade_in_duration")) if config.get("fade_in_duration") else crossfade_duration
|
|
fade_out_power = float(prev_song_config.get("fade_out_power", prev_song_config.get("fade_power", 1.0)))
|
|
fade_in_power = float(config.get("fade_in_power", config.get("fade_power", 1.0)))
|
|
fade_out_delay = parse_time(prev_song_config.get("fade_out_delay", "0:00"))
|
|
|
|
track1_before_fade = current_mix[:next_song_start_time]
|
|
track1_total_overlap = current_mix[next_song_start_time : next_song_start_time + crossfade_duration]
|
|
actual_overlap_duration = len(track1_total_overlap)
|
|
|
|
track1_constant_part = track1_total_overlap[:fade_out_delay]
|
|
track1_fade_part = track1_total_overlap[fade_out_delay:]
|
|
faded_out_segment = apply_custom_fade(track1_fade_part, 'out', len(track1_fade_part), fade_out_power)
|
|
faded_out_part = track1_constant_part + faded_out_segment
|
|
|
|
track2_overlay_section = next_song[:actual_overlap_duration]
|
|
actual_fade_in_duration = min(fade_in_duration, actual_overlap_duration)
|
|
track2_fade_in_part = track2_overlay_section[:actual_fade_in_duration]
|
|
track2_after_fade_in = track2_overlay_section[actual_fade_in_duration:]
|
|
faded_in_part = apply_custom_fade(track2_fade_in_part, 'in', actual_fade_in_duration, fade_in_power)
|
|
full_track2_overlay = faded_in_part + track2_after_fade_in
|
|
|
|
crossfade_result = faded_out_part.overlay(full_track2_overlay)
|
|
if crossfade_result.max_dBFS > -0.1:
|
|
crossfade_result = normalize_audio(crossfade_result, -0.1)
|
|
|
|
current_mix = track1_before_fade + crossfade_result + next_song[actual_overlap_duration:]
|
|
song_start_times.append(next_song_start_time)
|
|
chapters.append((next_song_start_time, os.path.basename(config['song_path'])))
|
|
|
|
# --- Export ---
|
|
logger.info("Exporting raw mix...")
|
|
temp_pcm = "temp_raw_mix.raw"
|
|
current_mix.export(temp_pcm, format="raw")
|
|
temp_raw_file = "temp_raw_mix.flac"
|
|
bits = current_mix.sample_width * 8
|
|
pcm_format = f"s{bits}le"
|
|
subprocess.run(["ffmpeg", "-f", pcm_format, "-ar", str(current_mix.frame_rate), "-ac", str(current_mix.channels), "-i", temp_pcm, "-c:a", "flac", "-y", temp_raw_file], check=True, capture_output=True)
|
|
|
|
logger.info("Normalizing...")
|
|
subprocess.run(["ffmpeg", "-i", temp_raw_file, "-af", f"loudnorm=I=-14:LRA=7:TP={final_mix_target_dbfs}", "-y", output_file], check=True, capture_output=True)
|
|
|
|
for f in [temp_pcm, temp_raw_file] + temp_files:
|
|
if os.path.exists(f): os.remove(f)
|
|
|
|
add_flac_chapters(output_file, chapters)
|
|
save_tempo_cache(tempo_cache.copy())
|
|
return output_file, len(current_mix), current_chain_tempo
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--playlist", type=str, default="ambient_mix_settings.json")
|
|
parser.add_argument("--debug", action="store_true")
|
|
parser.add_argument("--test-last-two", action="store_true")
|
|
parser.add_argument("--tracks", type=str)
|
|
args = parser.parse_args()
|
|
|
|
setup_logging(level=logging.DEBUG if args.debug else logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
with open(args.playlist, 'r') as f:
|
|
playlist_data = json.load(f)
|
|
|
|
if args.tracks:
|
|
try:
|
|
track_numbers = [int(t.strip()) for t in args.tracks.split(',')]
|
|
original_tracks = playlist_data['tracks']
|
|
playlist_data['tracks'] = [original_tracks[i-1] for i in track_numbers if 0 < i <= len(original_tracks)]
|
|
except ValueError:
|
|
logger.error("Invalid format for --tracks.")
|
|
sys.exit(1)
|
|
elif args.test_last_two:
|
|
playlist_data['tracks'] = playlist_data['tracks'][-2:]
|
|
|
|
try:
|
|
output_file, final_duration, ref_tempo = mix_songs(playlist_data)
|
|
print(f"\nSuccess! Output: {output_file}")
|
|
except Exception as e:
|
|
logging.error(f"Error: {e}", exc_info=True)
|