"""Audio transcoding utilities using FFmpeg.""" import os import subprocess from pathlib import Path from typing import Optional from ..utils.logging import get_logger logger = get_logger(__name__) class AudioTranscoder: """Audio transcoder for creating streaming-optimized files.""" def __init__(self, output_dir: Optional[str] = None): """Initialize transcoder. Args: output_dir: Directory to store transcoded files. If None, uses 'transcoded' subdir next to original. """ self.output_dir = output_dir def transcode_to_mp3( self, input_path: str, output_path: Optional[str] = None, bitrate: str = "128k", overwrite: bool = False, ) -> Optional[str]: """Transcode audio file to MP3. Args: input_path: Path to input audio file output_path: Path to output MP3 file. If None, auto-generated. bitrate: MP3 bitrate (default: 128k for streaming) overwrite: Whether to overwrite existing file Returns: Path to transcoded MP3 file, or None if failed """ try: input_file = Path(input_path) if not input_file.exists(): logger.error(f"Input file not found: {input_path}") return None # Generate output path if not provided if output_path is None: if self.output_dir: output_dir = Path(self.output_dir) else: # Create 'transcoded' directory next to original output_dir = input_file.parent / "transcoded" output_dir.mkdir(parents=True, exist_ok=True) output_path = str(output_dir / f"{input_file.stem}.mp3") output_file = Path(output_path) # Skip if already exists and not overwriting if output_file.exists() and not overwrite: logger.info(f"Transcoded file already exists: {output_path}") return str(output_file) logger.info(f"Transcoding {input_file.name} to MP3 {bitrate}...") # FFmpeg command for high-quality MP3 encoding cmd = [ "ffmpeg", "-i", str(input_file), "-vn", # No video "-acodec", "libmp3lame", # MP3 codec "-b:a", bitrate, # Bitrate "-q:a", "2", # High quality VBR (if CBR fails) "-ar", "44100", # Sample rate "-ac", "2", # Stereo "-y" if overwrite else "-n", # Overwrite or not str(output_file), ] # Run FFmpeg result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) if result.returncode != 0: logger.error(f"FFmpeg failed: {result.stderr}") return None if not output_file.exists(): logger.error(f"Transcoding failed: output file not created") return None output_size = output_file.stat().st_size input_size = input_file.stat().st_size compression_ratio = (1 - output_size / input_size) * 100 logger.info( f"✓ Transcoded: {input_file.name} → {output_file.name} " f"({output_size / 1024 / 1024:.2f} MB, {compression_ratio:.1f}% reduction)" ) return str(output_file) except Exception as e: logger.error(f"Failed to transcode {input_path}: {e}") return None def check_ffmpeg_available(self) -> bool: """Check if FFmpeg is available. Returns: True if FFmpeg is available, False otherwise """ try: result = subprocess.run( ["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) return result.returncode == 0 except FileNotFoundError: logger.error("FFmpeg not found. Please install FFmpeg.") return False