Add vortex / voice client.

This commit is contained in:
Paul
2021-06-23 14:52:33 +01:00
parent babb53c794
commit 11c524d6a9
11 changed files with 998 additions and 43 deletions

188
src/lib/vortex/Signaling.ts Normal file
View File

@@ -0,0 +1,188 @@
import EventEmitter from "eventemitter3";
import {
RtpCapabilities,
RtpParameters
} from "mediasoup-client/lib/RtpParameters";
import { DtlsParameters } from "mediasoup-client/lib/Transport";
import {
AuthenticationResult,
Room,
TransportInitDataTuple,
WSCommandType,
WSErrorCode,
ProduceType,
ConsumerData
} from "./Types";
interface SignalingEvents {
open: (event: Event) => void;
close: (event: CloseEvent) => void;
error: (event: Event) => void;
data: (data: any) => void;
}
export default class Signaling extends EventEmitter<SignalingEvents> {
ws?: WebSocket;
index: number;
pending: Map<number, (data: unknown) => void>;
constructor() {
super();
this.index = 0;
this.pending = new Map();
}
connected(): boolean {
return (
this.ws !== undefined &&
this.ws.readyState !== WebSocket.CLOSING &&
this.ws.readyState !== WebSocket.CLOSED
);
}
connect(address: string): Promise<void> {
this.disconnect();
this.ws = new WebSocket(address);
this.ws.onopen = e => this.emit("open", e);
this.ws.onclose = e => this.emit("close", e);
this.ws.onerror = e => this.emit("error", e);
this.ws.onmessage = e => this.parseData(e);
let finished = false;
return new Promise((resolve, reject) => {
this.once("open", () => {
if (finished) return;
finished = true;
resolve();
});
this.once("error", () => {
if (finished) return;
finished = true;
reject();
});
});
}
disconnect() {
if (
this.ws !== undefined &&
this.ws.readyState !== WebSocket.CLOSED &&
this.ws.readyState !== WebSocket.CLOSING
)
this.ws.close(1000);
}
private parseData(event: MessageEvent) {
if (typeof event.data !== "string") return;
const json = JSON.parse(event.data);
const entry = this.pending.get(json.id);
if (entry === undefined) {
this.emit("data", json);
return;
}
entry(json);
}
sendRequest(type: string, data?: any): Promise<any> {
if (this.ws === undefined || this.ws.readyState !== WebSocket.OPEN)
return Promise.reject({ error: WSErrorCode.NotConnected });
const ws = this.ws;
return new Promise((resolve, reject) => {
if (this.index >= 2 ** 32) this.index = 0;
while (this.pending.has(this.index)) this.index++;
const onClose = (e: CloseEvent) => {
reject({
error: e.code,
message: e.reason
});
};
const finishedFn = (data: any) => {
this.removeListener("close", onClose);
if (data.error)
reject({
error: data.error,
message: data.message,
data: data.data
});
resolve(data.data);
};
this.pending.set(this.index, finishedFn);
this.once("close", onClose);
const json = {
id: this.index,
type: type,
data
};
ws.send(JSON.stringify(json) + "\n");
this.index++;
});
}
authenticate(token: string, roomId: string): Promise<AuthenticationResult> {
return this.sendRequest(WSCommandType.Authenticate, { token, roomId });
}
async roomInfo(): Promise<Room> {
const room = await this.sendRequest(WSCommandType.RoomInfo);
return {
id: room.id,
videoAllowed: room.videoAllowed,
users: new Map(Object.entries(room.users))
};
}
initializeTransports(
rtpCapabilities: RtpCapabilities
): Promise<TransportInitDataTuple> {
return this.sendRequest(WSCommandType.InitializeTransports, {
mode: "SplitWebRTC",
rtpCapabilities
});
}
connectTransport(
id: string,
dtlsParameters: DtlsParameters
): Promise<void> {
return this.sendRequest(WSCommandType.ConnectTransport, {
id,
dtlsParameters
});
}
async startProduce(
type: ProduceType,
rtpParameters: RtpParameters
): Promise<string> {
let result = await this.sendRequest(WSCommandType.StartProduce, {
type,
rtpParameters
});
return result.producerId;
}
stopProduce(type: ProduceType): Promise<void> {
return this.sendRequest(WSCommandType.StopProduce, { type });
}
startConsume(userId: string, type: ProduceType): Promise<ConsumerData> {
return this.sendRequest(WSCommandType.StartConsume, { type, userId });
}
stopConsume(consumerId: string): Promise<void> {
return this.sendRequest(WSCommandType.StopConsume, { id: consumerId });
}
setConsumerPause(consumerId: string, paused: boolean): Promise<void> {
return this.sendRequest(WSCommandType.SetConsumerPause, {
id: consumerId,
paused
});
}
}

111
src/lib/vortex/Types.ts Normal file
View File

@@ -0,0 +1,111 @@
import { Consumer } from "mediasoup-client/lib/Consumer";
import {
MediaKind,
RtpCapabilities,
RtpParameters
} from "mediasoup-client/lib/RtpParameters";
import { SctpParameters } from "mediasoup-client/lib/SctpParameters";
import {
DtlsParameters,
IceCandidate,
IceParameters
} from "mediasoup-client/lib/Transport";
export enum WSEventType {
UserJoined = "UserJoined",
UserLeft = "UserLeft",
UserStartProduce = "UserStartProduce",
UserStopProduce = "UserStopProduce"
}
export enum WSCommandType {
Authenticate = "Authenticate",
RoomInfo = "RoomInfo",
InitializeTransports = "InitializeTransports",
ConnectTransport = "ConnectTransport",
StartProduce = "StartProduce",
StopProduce = "StopProduce",
StartConsume = "StartConsume",
StopConsume = "StopConsume",
SetConsumerPause = "SetConsumerPause"
}
export enum WSErrorCode {
NotConnected = 0,
NotFound = 404,
TransportConnectionFailure = 601,
ProducerFailure = 611,
ProducerNotFound = 614,
ConsumerFailure = 621,
ConsumerNotFound = 624
}
export enum WSCloseCode {
// Sent when the received data is not a string, or is unparseable
InvalidData = 1003,
Unauthorized = 4001,
RoomClosed = 4004,
// Sent when a client tries to send an opcode in the wrong state
InvalidState = 1002,
ServerError = 1011
}
export interface VoiceError {
error: WSErrorCode | WSCloseCode;
message: string;
}
export type ProduceType = "audio"; //| "video" | "saudio" | "svideo";
export interface AuthenticationResult {
userId: string;
roomId: string;
rtpCapabilities: RtpCapabilities;
}
export interface Room {
id: string;
videoAllowed: boolean;
users: Map<string, VoiceUser>;
}
export interface VoiceUser {
audio?: boolean;
//video?: boolean,
//saudio?: boolean,
//svideo?: boolean,
}
export interface ConsumerList {
audio?: Consumer;
//video?: Consumer,
//saudio?: Consumer,
//svideo?: Consumer,
}
export interface TransportInitData {
id: string;
iceParameters: IceParameters;
iceCandidates: IceCandidate[];
dtlsParameters: DtlsParameters;
sctpParameters: SctpParameters | undefined;
}
export interface TransportInitDataTuple {
sendTransport: TransportInitData;
recvTransport: TransportInitData;
}
export interface ConsumerData {
id: string;
producerId: string;
kind: MediaKind;
rtpParameters: RtpParameters;
}

View File

@@ -0,0 +1,331 @@
import EventEmitter from "eventemitter3";
import * as mediasoupClient from "mediasoup-client";
import {
Device,
Producer,
Transport,
UnsupportedError
} from "mediasoup-client/lib/types";
import {
ProduceType,
WSEventType,
VoiceError,
VoiceUser,
ConsumerList,
WSErrorCode
} from "./Types";
import Signaling from "./Signaling";
interface VoiceEvents {
ready: () => void;
error: (error: Error) => void;
close: (error?: VoiceError) => void;
startProduce: (type: ProduceType) => void;
stopProduce: (type: ProduceType) => void;
userJoined: (userId: string) => void;
userLeft: (userId: string) => void;
userStartProduce: (userId: string, type: ProduceType) => void;
userStopProduce: (userId: string, type: ProduceType) => void;
}
export default class VoiceClient extends EventEmitter<VoiceEvents> {
private _supported: boolean;
device?: Device;
signaling: Signaling;
sendTransport?: Transport;
recvTransport?: Transport;
userId?: string;
roomId?: string;
participants: Map<string, VoiceUser>;
consumers: Map<string, ConsumerList>;
audioProducer?: Producer;
constructor() {
super();
this._supported = mediasoupClient.detectDevice() !== undefined;
this.signaling = new Signaling();
this.participants = new Map();
this.consumers = new Map();
this.signaling.on(
"data",
json => {
const data = json.data;
switch (json.type) {
case WSEventType.UserJoined: {
this.participants.set(data.id, {});
this.emit("userJoined", data.id);
break;
}
case WSEventType.UserLeft: {
this.participants.delete(data.id);
this.emit("userLeft", data.id);
if (this.recvTransport) this.stopConsume(data.id);
break;
}
case WSEventType.UserStartProduce: {
const user = this.participants.get(data.id);
if (user === undefined) return;
switch (data.type) {
case "audio":
user.audio = true;
break;
default:
throw new Error(
`Invalid produce type ${data.type}`
);
}
if (this.recvTransport)
this.startConsume(data.id, data.type);
this.emit("userStartProduce", data.id, data.type);
break;
}
case WSEventType.UserStopProduce: {
const user = this.participants.get(data.id);
if (user === undefined) return;
switch (data.type) {
case "audio":
user.audio = false;
break;
default:
throw new Error(
`Invalid produce type ${data.type}`
);
}
if (this.recvTransport)
this.stopConsume(data.id, data.type);
this.emit("userStopProduce", data.id, data.type);
break;
}
}
},
this
);
this.signaling.on(
"error",
error => {
this.emit("error", new Error("Signaling error"));
},
this
);
this.signaling.on(
"close",
error => {
this.disconnect(
{
error: error.code,
message: error.reason
},
true
);
},
this
);
}
supported() {
return this._supported;
}
throwIfUnsupported() {
if (!this._supported) throw new UnsupportedError("RTC not supported");
}
connect(address: string, roomId: string) {
this.throwIfUnsupported();
this.device = new Device();
this.roomId = roomId;
return this.signaling.connect(address);
}
disconnect(error?: VoiceError, ignoreDisconnected?: boolean) {
if (!this.signaling.connected() && !ignoreDisconnected) return;
this.signaling.disconnect();
this.participants = new Map();
this.consumers = new Map();
this.userId = undefined;
this.roomId = undefined;
this.audioProducer = undefined;
if (this.sendTransport) this.sendTransport.close();
if (this.recvTransport) this.recvTransport.close();
this.sendTransport = undefined;
this.recvTransport = undefined;
this.emit("close", error);
}
async authenticate(token: string) {
this.throwIfUnsupported();
if (this.device === undefined || this.roomId === undefined)
throw new ReferenceError("Voice Client is in an invalid state");
const result = await this.signaling.authenticate(token, this.roomId);
let [room] = await Promise.all([
this.signaling.roomInfo(),
this.device.load({ routerRtpCapabilities: result.rtpCapabilities })
]);
this.userId = result.userId;
this.participants = room.users;
}
async initializeTransports() {
this.throwIfUnsupported();
if (this.device === undefined)
throw new ReferenceError("Voice Client is in an invalid state");
const initData = await this.signaling.initializeTransports(
this.device.rtpCapabilities
);
this.sendTransport = this.device.createSendTransport(
initData.sendTransport
);
this.recvTransport = this.device.createRecvTransport(
initData.recvTransport
);
const connectTransport = (transport: Transport) => {
transport.on("connect", ({ dtlsParameters }, callback, errback) => {
this.signaling
.connectTransport(transport.id, dtlsParameters)
.then(callback)
.catch(errback);
});
};
connectTransport(this.sendTransport);
connectTransport(this.recvTransport);
this.sendTransport.on("produce", (parameters, callback, errback) => {
const type = parameters.appData.type;
if (
parameters.kind === "audio" &&
type !== "audio" &&
type !== "saudio"
)
return errback();
if (
parameters.kind === "video" &&
type !== "video" &&
type !== "svideo"
)
return errback();
this.signaling
.startProduce(type, parameters.rtpParameters)
.then(id => callback({ id }))
.catch(errback);
});
this.emit("ready");
for (let user of this.participants) {
if (user[1].audio && user[0] !== this.userId)
this.startConsume(user[0], "audio");
}
}
private async startConsume(userId: string, type: ProduceType) {
if (this.recvTransport === undefined)
throw new Error("Receive transport undefined");
const consumers = this.consumers.get(userId) || {};
const consumerParams = await this.signaling.startConsume(userId, type);
const consumer = await this.recvTransport.consume(consumerParams);
switch (type) {
case "audio":
consumers.audio = consumer;
}
const mediaStream = new MediaStream([consumer.track]);
const audio = new Audio();
audio.srcObject = mediaStream;
await this.signaling.setConsumerPause(consumer.id, false);
audio.play();
this.consumers.set(userId, consumers);
}
private async stopConsume(userId: string, type?: ProduceType) {
const consumers = this.consumers.get(userId);
if (consumers === undefined) return;
if (type === undefined) {
if (consumers.audio !== undefined) consumers.audio.close();
this.consumers.delete(userId);
} else {
switch (type) {
case "audio": {
if (consumers.audio !== undefined) {
consumers.audio.close();
this.signaling.stopConsume(consumers.audio.id);
}
consumers.audio = undefined;
break;
}
}
this.consumers.set(userId, consumers);
}
}
async startProduce(track: MediaStreamTrack, type: ProduceType) {
if (this.sendTransport === undefined)
throw new Error("Send transport undefined");
const producer = await this.sendTransport.produce({
track,
appData: { type }
});
switch (type) {
case "audio":
this.audioProducer = producer;
break;
}
const participant = this.participants.get(this.userId || "");
if (participant !== undefined) {
participant[type] = true;
this.participants.set(this.userId || "", participant);
}
this.emit("startProduce", type);
}
async stopProduce(type: ProduceType) {
let producer;
switch (type) {
case "audio":
producer = this.audioProducer;
this.audioProducer = undefined;
break;
}
if (producer !== undefined) {
producer.close();
this.emit("stopProduce", type);
}
const participant = this.participants.get(this.userId || "");
if (participant !== undefined) {
participant[type] = false;
this.participants.set(this.userId || "", participant);
}
try {
await this.signaling.stopProduce(type);
} catch (error) {
if (error.error === WSErrorCode.ProducerNotFound) return;
else throw error;
}
}
}