Files
PTT-Live/server/websocket/AudioLevelsServer.js
benoit b7911badb2 fix: corriger conflit upgrade WebSocket entre proxy LiveKit et audio-levels
AudioLevelsServer s'auto-attachait à l'événement 'upgrade' du serveur HTTP
via la lib ws (server + path), en plus du listener manuel du proxy LiveKit.
Pour toute connexion /livekit, les deux listeners s'exécutaient : le proxy
LiveKit aboutissait bien côté upstream, mais le listener ws (path
/audio-levels ne matchant pas) appelait abortHandshake(socket, 400) sur le
même socket juste après, cassant la connexion côté client en HTTPS prod.

AudioLevelsServer passe maintenant en noServer: true et expose
handleUpgrade(), appelée par un dispatcher 'upgrade' unique dans
server/index.js qui route explicitement par chemin (/livekit vs
/audio-levels).

Ajout de certs/ au .gitignore (clés privées SSL locales mkcert).
2026-06-30 13:29:09 +02:00

391 lines
9.5 KiB
JavaScript

/**
* AudioLevelsServer.js
* WebSocket server pour streaming des niveaux audio temps réel
*
* Permet à l'interface admin de visualiser :
* - Niveaux d'entrée physiques (VU-mètres)
* - Niveaux de groupes LiveKit
* - Niveaux de sortie physiques
* - Détection de clipping
* - État des routes actives
*/
import { WebSocketServer } from 'ws';
import { EventEmitter } from 'events';
/**
* Calcule le niveau RMS d'un buffer audio (dBFS)
*/
function calculateRMS(buffer) {
if (!buffer || buffer.length === 0) return -120; // Silence
let sum = 0;
for (let i = 0; i < buffer.length; i++) {
sum += buffer[i] * buffer[i];
}
const rms = Math.sqrt(sum / buffer.length);
// Conversion en dBFS (0dBFS = niveau max)
if (rms === 0) return -120;
const dbFS = 20 * Math.log10(rms);
return Math.max(-120, Math.min(0, dbFS));
}
/**
* Calcule le peak d'un buffer audio
*/
function calculatePeak(buffer) {
if (!buffer || buffer.length === 0) return 0;
let peak = 0;
for (let i = 0; i < buffer.length; i++) {
peak = Math.max(peak, Math.abs(buffer[i]));
}
return peak;
}
/**
* Serveur WebSocket pour monitoring audio
*/
export class AudioLevelsServer extends EventEmitter {
constructor(options = {}) {
super();
this.options = {
port: options.port || 3001,
server: options.server || null, // Serveur HTTP existant
updateRateMs: options.updateRateMs || 50, // 20 fois/sec
...options
};
this.wss = null;
this.clients = new Set();
this.updateInterval = null;
// Données à broadcaster
this.levels = {
inputs: {}, // { channelId: { rms: -12, peak: 0.5, clipping: false } }
groups: {}, // { groupName: { rms: -8, peak: 0.7, clipping: false } }
outputs: {}, // { channelId: { rms: -10, peak: 0.6, clipping: false } }
routing: {
activeInputs: [],
activeGroups: [],
activeOutputs: []
}
};
this.stats = {
connectedClients: 0,
messagesSent: 0,
errors: 0
};
}
/**
* Démarre le serveur WebSocket
*/
start() {
return new Promise((resolve, reject) => {
try {
// Si un serveur HTTP est fourni, utiliser le même port (upgrade HTTP → WebSocket)
// noServer: true car l'upgrade est dispatché manuellement par server/index.js
// (un seul listener 'upgrade' partagé avec le proxy LiveKit, voir handleUpgrade())
// Sinon, créer un serveur WebSocket standalone sur son propre port
const wsOptions = this.options.server
? { noServer: true }
: { port: this.options.port };
this.wss = new WebSocketServer(wsOptions);
this.wss.on('connection', (ws, req) => {
this._handleNewConnection(ws, req);
});
this.wss.on('error', (error) => {
console.error('Erreur WebSocket server:', error);
this.stats.errors++;
this.emit('error', error);
});
// Démarrage du broadcast périodique
this._startBroadcast();
if (this.options.server) {
console.log(`WebSocket AudioLevels démarré sur path /audio-levels (même port que HTTP)`);
} else {
console.log(`WebSocket AudioLevels démarré sur ws://localhost:${this.options.port}`);
}
this.emit('started');
resolve();
} catch (error) {
reject(error);
}
});
}
/**
* Complète l'upgrade WebSocket pour une requête déjà identifiée comme
* ciblant ce serveur (voir le dispatcher 'upgrade' dans server/index.js)
*/
handleUpgrade(req, socket, head) {
this.wss.handleUpgrade(req, socket, head, (ws) => {
this.wss.emit('connection', ws, req);
});
}
/**
* Gère une nouvelle connexion client
*/
_handleNewConnection(ws, req) {
const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`;
console.log(`Nouveau client audio-levels: ${clientId}`);
this.clients.add(ws);
this.stats.connectedClients = this.clients.size;
// Envoi des données actuelles immédiatement
this._sendToClient(ws, {
type: 'initial',
data: this.levels
});
ws.on('message', (message) => {
try {
const data = JSON.parse(message);
this._handleClientMessage(ws, data);
} catch (error) {
console.error('Erreur parsing message client:', error);
}
});
ws.on('close', () => {
console.log(`Client déconnecté: ${clientId}`);
this.clients.delete(ws);
this.stats.connectedClients = this.clients.size;
});
ws.on('error', (error) => {
console.error(`Erreur client ${clientId}:`, error);
this.clients.delete(ws);
this.stats.connectedClients = this.clients.size;
});
this.emit('clientConnected', { clientId, totalClients: this.clients.size });
}
/**
* Gère les messages entrants des clients
*/
_handleClientMessage(ws, message) {
switch (message.type) {
case 'ping':
this._sendToClient(ws, { type: 'pong', timestamp: Date.now() });
break;
case 'setUpdateRate':
// Permet au client de modifier le taux de rafraîchissement
if (message.rateMs >= 20 && message.rateMs <= 1000) {
this._restartBroadcast(message.rateMs);
}
break;
default:
console.warn('Message client inconnu:', message.type);
}
}
/**
* Démarre le broadcast périodique
*/
_startBroadcast() {
if (this.updateInterval) {
clearInterval(this.updateInterval);
}
this.updateInterval = setInterval(() => {
this._broadcastLevels();
}, this.options.updateRateMs);
}
/**
* Redémarre le broadcast avec un nouveau taux
*/
_restartBroadcast(newRateMs) {
this.options.updateRateMs = newRateMs;
this._startBroadcast();
console.log(`Taux de rafraîchissement modifié: ${newRateMs}ms`);
}
/**
* Broadcast les niveaux à tous les clients connectés
*/
_broadcastLevels() {
if (this.clients.size === 0) return;
const message = {
type: 'levels',
timestamp: Date.now(),
data: this.levels
};
this._broadcast(message);
}
/**
* Envoie un message à tous les clients
*/
_broadcast(message) {
const payload = JSON.stringify(message);
this.clients.forEach(ws => {
if (ws.readyState === 1) { // OPEN
try {
ws.send(payload);
this.stats.messagesSent++;
} catch (error) {
console.error('Erreur envoi message:', error);
this.stats.errors++;
}
}
});
}
/**
* Envoie un message à un client spécifique
*/
_sendToClient(ws, message) {
if (ws.readyState === 1) {
try {
ws.send(JSON.stringify(message));
this.stats.messagesSent++;
} catch (error) {
console.error('Erreur envoi message client:', error);
this.stats.errors++;
}
}
}
/**
* Met à jour les niveaux d'entrée
* Appelé par le GroupAudioRouter après processInputsToGroups()
*/
updateInputLevels(inputBuffers) {
inputBuffers.forEach((buffer, channelId) => {
const rms = calculateRMS(buffer);
const peak = calculatePeak(buffer);
const clipping = peak >= 0.99;
this.levels.inputs[channelId] = { rms, peak, clipping };
});
this.levels.routing.activeInputs = Array.from(inputBuffers.keys());
}
/**
* Met à jour les niveaux de groupe
* Appelé par le GroupAudioRouter après processInputsToGroups()
*/
updateGroupLevels(groupBuffers) {
groupBuffers.forEach((buffer, groupName) => {
const rms = calculateRMS(buffer);
const peak = calculatePeak(buffer);
const clipping = peak >= 0.99;
this.levels.groups[groupName] = { rms, peak, clipping };
});
this.levels.routing.activeGroups = Array.from(groupBuffers.keys());
}
/**
* Met à jour les niveaux de sortie
* Appelé par le GroupAudioRouter après processGroupsToOutputs()
*/
updateOutputLevels(outputBuffers) {
outputBuffers.forEach((buffer, channelId) => {
const rms = calculateRMS(buffer);
const peak = calculatePeak(buffer);
const clipping = peak >= 0.99;
this.levels.outputs[channelId] = { rms, peak, clipping };
});
this.levels.routing.activeOutputs = Array.from(outputBuffers.keys());
}
/**
* Réinitialise tous les niveaux (silence)
*/
resetLevels() {
this.levels = {
inputs: {},
groups: {},
outputs: {},
routing: {
activeInputs: [],
activeGroups: [],
activeOutputs: []
}
};
}
/**
* Récupère les statistiques
*/
getStats() {
return {
...this.stats,
updateRateMs: this.options.updateRateMs,
port: this.options.port
};
}
/**
* Arrête le serveur
*/
async stop() {
console.log('Arrêt AudioLevelsServer...');
if (this.updateInterval) {
clearInterval(this.updateInterval);
this.updateInterval = null;
}
if (this.wss) {
// Ferme toutes les connexions clients
this.clients.forEach(ws => {
ws.close(1000, 'Server shutdown');
});
this.clients.clear();
// Ferme le serveur
await new Promise((resolve) => {
this.wss.close(() => {
console.log('WebSocket AudioLevels arrêté');
resolve();
});
});
this.wss = null;
}
this.emit('stopped');
}
/**
* Détruit le serveur
*/
async destroy() {
await this.stop();
this.removeAllListeners();
console.log('AudioLevelsServer détruit');
}
}
export default AudioLevelsServer;