diff --git a/server/bridge/AudioBridge.js b/server/bridge/AudioBridge.js index 472d78b..b2c1856 100644 --- a/server/bridge/AudioBridge.js +++ b/server/bridge/AudioBridge.js @@ -19,6 +19,7 @@ import OpusCodec, { OpusPresets } from './OpusCodec.js'; import JitterBuffer, { JitterBufferPresets } from './JitterBuffer.js'; import LiveKitClient from './LiveKitClient.js'; import GroupAudioRouter from './GroupAudioRouter.js'; +import ServerAudioUser from './ServerAudioUser.js'; export class AudioBridge extends EventEmitter { constructor(options = {}) { @@ -68,6 +69,9 @@ export class AudioBridge extends EventEmitter { // Frame accumulators pour LiveKit (240 samples → 960 samples) this.liveKitFrameAccumulators = new Map(); // Map + // Utilisateurs audio gérés côté serveur (participants LiveKit avec I/O physique dédiés) + this.serverAudioUsers = new Map(); // Map + // Pool de buffers pré-alloués pour éviter allocations répétées this.bufferPool = { float32: [], // Pool de Float32Array réutilisables @@ -120,7 +124,10 @@ export class AudioBridge extends EventEmitter { // 5. Connexion à LiveKit await this._initLiveKit(); - // 6. Démarrage du routing audio + // 6. Initialisation des server audio users + await this._initServerAudioUsers(); + + // 7. Démarrage du routing audio await this._startAudioRouting(); this.isRunning = true; @@ -386,6 +393,57 @@ export class AudioBridge extends EventEmitter { console.log(`✓ ${this.liveKitClients.size} connexions LiveKit établies`); } + /** + * Initialise les utilisateurs audio serveur (participants LiveKit avec I/O physique) + * @private + */ + async _initServerAudioUsers() { + const users = this.options.serverAudioUsers; + if (!users || users.length === 0) return; + + console.log(`🎤 Initialisation ${users.length} server audio user(s)...`); + + for (const userConfig of users) { + const user = new ServerAudioUser({ + name: userConfig.name, + groupId: userConfig.groupId, + inputChannel: userConfig.inputChannel, + outputChannel: userConfig.outputChannel, + liveKitUrl: this.options.liveKitUrl, + token: userConfig.token, + sampleRate: this.options.sampleRate, + frameSize: this.options.frameSize + }); + + // Quand une frame de mix est prête, l'envoyer vers le canal physique de sortie + const outputCh = userConfig.outputChannel; + user.on('outputReady', (mixBuffer) => { + if (!this.audioBackend) return; + const numChannels = this.options.channels || 1; + const frameSize = this.options.frameSize; + + if (numChannels <= 1) { + const pcmBuffer = this._float32ToBuffer(mixBuffer); + this.audioBackend.queueAudio(pcmBuffer); + } else { + // Construire un buffer multi-canaux avec l'audio du user sur son canal de sortie + const interleaved = new Float32Array(frameSize * numChannels); + for (let i = 0; i < frameSize; i++) { + interleaved[i * numChannels + outputCh] = mixBuffer[i]; + } + const pcmBuffer = this._float32ToBuffer(interleaved); + this.audioBackend.queueAudio(pcmBuffer); + } + }); + + await user.start(); + this.serverAudioUsers.set(userConfig.name, user); + console.log(`✓ Server audio user "${userConfig.name}" démarré (entrée canal ${userConfig.inputChannel} → sortie canal ${userConfig.outputChannel}, room: ${userConfig.groupId})`); + } + + console.log(`✓ ${this.serverAudioUsers.size} server audio user(s) initialisés`); + } + /** * Démarre le routing audio bidirectionnel complet * @private @@ -424,6 +482,14 @@ export class AudioBridge extends EventEmitter { } } + // ÉTAPE 0 : Envoyer les données de chaque canal vers les server audio users + for (const [, user] of this.serverAudioUsers) { + const channelData = this.inputChannelBuffers.get(user.inputChannel); + if (channelData) { + user.sendAudio(channelData); + } + } + // ÉTAPE 1 : Inputs physiques → Groupes (via GroupAudioRouter) const groupBuffers = this.groupAudioRouter.processInputsToGroups( this.inputChannelBuffers @@ -760,6 +826,13 @@ export class AudioBridge extends EventEmitter { } this.liveKitClients.clear(); + // Arrêter les server audio users + for (const [name, user] of this.serverAudioUsers.entries()) { + console.log(`🔌 Arrêt server audio user "${name}"...`); + await user.stop(); + } + this.serverAudioUsers.clear(); + if (this.groupAudioRouter) { this.groupAudioRouter.destroy(); this.groupAudioRouter = null; diff --git a/server/bridge/AudioBridgeManager.js b/server/bridge/AudioBridgeManager.js index 5b3caec..112f406 100644 --- a/server/bridge/AudioBridgeManager.js +++ b/server/bridge/AudioBridgeManager.js @@ -89,6 +89,46 @@ class AudioBridgeManager extends EventEmitter { return; } + // Générer un token JWT par server audio user + const serverAudioUsers = []; + + for (const user of config.server_audio_users || []) { + const groupId = slugify(user.group); + + const token = new AccessToken( + config.server?.livekit?.apiKey || 'devkey', + config.server?.livekit?.apiSecret || 'secret', + { + identity: `server-${user.name}`, + name: `Server Audio - ${user.name}`, + metadata: JSON.stringify({ + role: 'server-audio-user', + group: groupId + }) + } + ); + + token.addGrant({ + room: groupId, + roomJoin: true, + canPublish: true, + canSubscribe: true, + canPublishData: true + }); + + const jwt = await token.toJwt(); + + serverAudioUsers.push({ + name: user.name, + groupId, + inputChannel: user.input_channel ?? user.inputChannel ?? 0, + outputChannel: user.output_channel ?? user.outputChannel ?? 0, + token: jwt + }); + + console.log(`✓ Token JWT généré pour server audio user "${user.name}" (room: ${groupId})`); + } + // Import dynamique du AudioBridge const { AudioBridge } = await import('./AudioBridge.js'); @@ -123,6 +163,8 @@ class AudioBridgeManager extends EventEmitter { // Options LiveKit (multi-rooms) liveKitUrl, liveKitTokens, // Tableau de { groupName, groupId, token } + // Server audio users + serverAudioUsers, // Options de routing routing: config.audio?.routing || {}, groups: config.groups || [], diff --git a/server/bridge/ServerAudioUser.js b/server/bridge/ServerAudioUser.js new file mode 100644 index 0000000..7083612 --- /dev/null +++ b/server/bridge/ServerAudioUser.js @@ -0,0 +1,188 @@ +/** + * ServerAudioUser.js + * Utilisateur audio géré côté serveur : participant LiveKit indépendant + * avec un canal physique d'entrée dédié et un canal physique de sortie dédié. + * + * Chaque instance : + * - Publie son canal physique d'entrée comme track LiveKit + * - Reçoit l'audio de tous les autres participants (mix-minus naturel) + * - Émet 'outputReady' avec le mix Float32 quand une frame complète est prête + */ + +import { EventEmitter } from 'events'; +import LiveKitClient from './LiveKitClient.js'; + +class ServerAudioUser extends EventEmitter { + constructor(options) { + super(); + + this.name = options.name; + this.inputChannel = parseInt(options.inputChannel, 10); + this.outputChannel = parseInt(options.outputChannel, 10); + this.groupId = options.groupId; + this.frameSize = options.frameSize || 960; + this.sampleRate = options.sampleRate || 48000; + + this.client = new LiveKitClient({ + url: options.liveKitUrl, + token: options.token, + roomName: options.groupId, + participantName: `server-${options.name}`, + sampleRate: this.sampleRate, + channels: 1, + }); + + // Accumulateurs PCM par participant distant (pour pouvoir mixer leurs frames) + this.participantAccumulators = new Map(); // Map + + // Dernier mix calculé (prêt à être envoyé vers le canal physique de sortie) + this.mixedOutput = null; // Float32Array de frameSize samples + + this._setupClientEvents(); + } + + _setupClientEvents() { + this.client.on('connected', () => { + console.log(`[ServerAudioUser:${this.name}] Connecté à room "${this.groupId}" (in:${this.inputChannel} → out:${this.outputChannel})`); + this.emit('connected'); + }); + + this.client.on('disconnected', (data) => { + console.warn(`[ServerAudioUser:${this.name}] Déconnecté:`, data?.reason || 'unknown'); + this.emit('disconnected', data); + }); + + // Réception audio depuis les autres participants → accumulation et mix + this.client.on('audioData', ({ participantSid, pcmData }) => { + this._accumulate(participantSid, pcmData); + }); + + // Nettoyage des buffers quand un participant quitte + this.client.on('participantDisconnected', (participant) => { + this.participantAccumulators.delete(participant.sid); + }); + } + + /** + * Démarre la connexion LiveKit + */ + async start() { + await this.client.connect(); + } + + /** + * Envoie les données audio du canal d'entrée physique vers LiveKit. + * Appelé par AudioBridge à chaque frame de capture. + * @param {Float32Array} float32Data - Données PCM normalisées [-1.0, 1.0] + */ + sendAudio(float32Data) { + if (!this.client.isConnected) return; + + const pcmBuffer = this._float32ToBuffer(float32Data); + this.client.sendAudioData(pcmBuffer); + } + + /** + * Retourne le dernier mix calculé, ou null si aucune frame reçue. + * @returns {Float32Array|null} + */ + getMixedOutput() { + return this.mixedOutput; + } + + /** + * Accumule les frames PCM reçues d'un participant. + * Quand une frame complète est disponible, calcule le mix. + * @private + */ + _accumulate(participantSid, pcmData) { + const float32 = this._bufferToFloat32(pcmData); + + if (!this.participantAccumulators.has(participantSid)) { + this.participantAccumulators.set(participantSid, { + buffer: new Float32Array(this.frameSize), + offset: 0 + }); + } + + const acc = this.participantAccumulators.get(participantSid); + const toCopy = Math.min(float32.length, this.frameSize - acc.offset); + + if (toCopy > 0) { + acc.buffer.set(float32.subarray(0, toCopy), acc.offset); + acc.offset += toCopy; + } + + if (acc.offset >= this.frameSize) { + this._computeMix(); + acc.offset = 0; + acc.buffer.fill(0); + } + } + + /** + * Calcule le mix additif de tous les participants et émet 'outputReady'. + * @private + */ + _computeMix() { + const mix = new Float32Array(this.frameSize); + + for (const { buffer } of this.participantAccumulators.values()) { + for (let i = 0; i < this.frameSize; i++) { + mix[i] += buffer[i]; + } + } + + // Clamp + for (let i = 0; i < mix.length; i++) { + mix[i] = Math.max(-1.0, Math.min(1.0, mix[i])); + } + + this.mixedOutput = mix; + this.emit('outputReady', mix); + } + + /** + * Convertit Buffer/Int16Array PCM 16-bit → Float32Array [-1.0, 1.0] + * @private + */ + _bufferToFloat32(buffer) { + if (buffer instanceof Int16Array) { + const f = new Float32Array(buffer.length); + for (let i = 0; i < buffer.length; i++) f[i] = buffer[i] / 32768.0; + return f; + } + if (!(buffer instanceof Buffer)) buffer = Buffer.from(buffer); + const samples = buffer.length / 2; + const f = new Float32Array(samples); + for (let i = 0; i < samples; i++) { + f[i] = buffer.readInt16LE(i * 2) / 32768.0; + } + return f; + } + + /** + * Convertit Float32Array [-1.0, 1.0] → Buffer PCM 16-bit + * @private + */ + _float32ToBuffer(float32) { + const buf = Buffer.alloc(float32.length * 2); + for (let i = 0; i < float32.length; i++) { + const clamped = Math.max(-1.0, Math.min(1.0, float32[i])); + buf.writeInt16LE(Math.round(clamped * 32767), i * 2); + } + return buf; + } + + /** + * Arrête l'utilisateur et libère les ressources. + */ + async stop() { + await this.client.destroy(); + this.participantAccumulators.clear(); + this.mixedOutput = null; + this.removeAllListeners(); + } +} + +export default ServerAudioUser; diff --git a/server/config/config.yaml b/server/config/config.yaml index 221548f..e9e8ac5 100644 --- a/server/config/config.yaml +++ b/server/config/config.yaml @@ -39,6 +39,22 @@ audio: "0": L "1": R "2": Talkback Console +# Utilisateurs audio gérés côté serveur. +# Chaque entrée crée un participant LiveKit indépendant avec un canal physique +# d'entrée (microphone/ligne) et un canal physique de sortie dédié (mix-minus naturel). +# +# Exemple (décommenter et adapter) : +# server_audio_users: +# - name: foh +# group: default # ID du groupe LiveKit (room) à rejoindre +# input_channel: 1 # Index canal physique d'entrée (depuis inputDeviceId) +# output_channel: 2 # Index canal physique de sortie (vers outputDeviceId) +# - name: returns +# group: default +# input_channel: 2 +# output_channel: 3 +server_audio_users: [] + groups: - name: Default audioBitrate: 96