Fri Mar 05 2021
Last time, I posted for 1:N P2P communication using WebRTC. I thought about posting about the SFU method or not, but I thought it would be better to do it. SFU is a type of Media Server, so please click here to check the past posting. Media servers are used during commercialization using Kurento and mediasoup. However, based on his theory, I wanted to organize an SFU server among media servers. The theoretical explanation was covered in the existing post, so please check the link here above. Assuming that you know all the theoretical backgrounds, I will write a post about the implementation.
Note
You must use socket.io version=2.3.0.
let receiverPCs = {};
let senderPCs = {};
let users = {};
let socketToRoom = {};
socket.on("joinRoom", data => {
try {
let allUsers = getOtherUsersInRoom(data.id, data.roomID);
io.to(data.id).emit("allUsers", { users: allUsers });
} catch (error) {
console.log(error);
}
});
Note
The reason why createAnswer keeps both of offerToReceiveAudio and offerToReceiveVideo true is that they must receive both audio and video streams from users.
socket.on("senderOffer", async data => {
try {
socketToRoom[data.senderSocketID] = data.roomID;
let pc = createReceiverPeerConnection(
data.senderSocketID,
socket,
data.roomID
);
await pc.setRemoteDescription(data.sdp);
let sdp = await pc.createAnswer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
});
await pc.setLocalDescription(sdp);
socket.join(data.roomID);
io.to(data.senderSocketID).emit("getSenderAnswer", { sdp });
} catch (error) {
console.log(error);
}
});
socket.on("senderCandidate", async data => {
try {
let pc = receiverPCs[data.senderSocketID];
await pc.addIceCandidate(new wrtc.RTCIceCandidate(data.candidate));
} catch (error) {
console.log(error);
}
});
Note
The reason why createAnswer has false versions of both offerToReceiveAudio and offerToReceiveVideo is that they do not receive audio and video streams from users.(The RTCPeerConnection you created now is a connection to send a stream of existing users.)
socket.on("receiverOffer", async data => {
try {
let pc = createSenderPeerConnection(
data.receiverSocketID,
data.senderSocketID,
socket,
data.roomID
);
await pc.setRemoteDescription(data.sdp);
let sdp = await pc.createAnswer({
offerToReceiveAudio: false,
offerToReceiveVideo: false,
});
await pc.setLocalDescription(sdp);
io.to(data.receiverSocketID).emit("getReceiverAnswer", {
id: data.senderSocketID,
sdp,
});
} catch (error) {
console.log(error);
}
});
socket.on("receiverCandidate", async data => {
try {
const senderPC = senderPCs[data.senderSocketID].filter(
sPC => sPC.id === data.receiverSocketID
);
await senderPC[0].pc.addIceCandidate(
new wrtc.RTCIceCandidate(data.candidate)
);
} catch (error) {
console.log(error);
}
});
socket.on("disconnect", () => {
try {
let roomID = socketToRoom[socket.id];
deleteUser(socket.id, roomID);
closeRecevierPC(socket.id);
closeSenderPCs(socket.id);
socket.broadcast.to(roomID).emit("userExit", { id: socket.id });
} catch (error) {
console.log(error);
}
});
const isIncluded = (array, id) => {
let len = array.length;
for (let i = 0; i < len; i++) {
if (array[i].id === id) return true;
}
return false;
};
const createReceiverPeerConnection = (socketID, socket, roomID) => {
let pc = new wrtc.RTCPeerConnection(pc_config);
if (receiverPCs[socketID]) receiverPCs[socketID] = pc;
else receiverPCs = { ...receiverPCs, [socketID]: pc };
pc.onicecandidate = e => {
//console.log(`socketID: ${socketID}'s receiverPeerConnection icecandidate`);
socket.to(socketID).emit("getSenderCandidate", {
candidate: e.candidate,
});
};
pc.oniceconnectionstatechange = e => {
//console.log(e);
};
pc.ontrack = e => {
if (users[roomID]) {
if (!isIncluded(users[roomID], socketID)) {
users[roomID].push({
id: socketID,
stream: e.streams[0],
});
} else return;
} else {
users[roomID] = [
{
id: socketID,
stream: e.streams[0],
},
];
}
socket.broadcast.to(roomID).emit("userEnter", { id: socketID });
};
return pc;
};
const createSenderPeerConnection = (
receiverSocketID,
senderSocketID,
socket,
roomID
) => {
let pc = new wrtc.RTCPeerConnection(pc_config);
if (senderPCs[senderSocketID]) {
senderPCs[senderSocketID].filter(user => user.id !== receiverSocketID);
senderPCs[senderSocketID].push({ id: receiverSocketID, pc: pc });
} else
senderPCs = {
...senderPCs,
[senderSocketID]: [{ id: receiverSocketID, pc: pc }],
};
pc.onicecandidate = e => {
//console.log(`socketID: ${receiverSocketID}'s senderPeerConnection icecandidate`);
socket.to(receiverSocketID).emit("getReceiverCandidate", {
id: senderSocketID,
candidate: e.candidate,
});
};
pc.oniceconnectionstatechange = e => {
//console.log(e);
};
const sendUser = users[roomID].filter(user => user.id === senderSocketID);
sendUser[0].stream.getTracks().forEach(track => {
pc.addTrack(track, sendUser[0].stream);
});
return pc;
};
const getOtherUsersInRoom = (socketID, roomID) => {
let allUsers = [];
if (!users[roomID]) return allUsers;
let len = users[roomID].length;
for (let i = 0; i < len; i++) {
if (users[roomID][i].id === socketID) continue;
allUsers.push({ id: users[roomID][i].id });
}
return allUsers;
};
const deleteUser = (socketID, roomID) => {
let roomUsers = users[roomID];
if (!roomUsers) return;
roomUsers = roomUsers.filter(user => user.id !== socketID);
users[roomID] = roomUsers;
if (roomUsers.length === 0) {
delete users[roomID];
}
delete socketToRoom[socketID];
};
const closeRecevierPC = socketID => {
if (!receiverPCs[socketID]) return;
receiverPCs[socketID].close();
delete receiverPCs[socketID];
};
const closeSenderPCs = socketID => {
if (!senderPCs[socketID]) return;
let len = senderPCs[socketID].length;
for (let i = 0; i < len; i++) {
senderPCs[socketID][i].pc.close();
let _senderPCs = senderPCs[senderPCs[socketID][i].id];
let senderPC = _senderPCs.filter(sPC => sPC.id === socketID);
if (senderPC[0]) {
senderPC[0].pc.close();
senderPCs[senderPCs[socketID][i].id] = _senderPCs.filter(
sPC => sPC.id !== socketID
);
}
}
delete senderPCs[socketID];
};
Note
You must use socket.io-client version=2.3.0, @types/socket.io-client version=1.4.34.
const [socket, setSocket] = useState<SocketIOClient.Socket>();
const [users, setUsers] = useState<Array<IWebRTCUser>>([]);
let localVideoRef = useRef<HTMLVideoElement>(null);
let sendPC: RTCPeerConnection;
let receivePCs: any;
const pc_config = {
iceServers: [
// {
// urls: 'stun:[STUN_IP]:[PORT]',
// 'credentials': '[YOR CREDENTIALS]',
// 'username': '[USERNAME]'
// },
{
urls: "stun:stun.l.google.com:19302",
},
],
};
newSocket.on("userEnter", (data: { id: string }) => {
createReceivePC(data.id, newSocket);
});
newSocket.on("allUsers", (data: { users: Array<{ id: string }> }) => {
let len = data.users.length;
for (let i = 0; i < len; i++) {
createReceivePC(data.users[i].id, newSocket);
}
});
newSocket.on("userExit", (data: { id: string }) => {
receivePCs[data.id].close();
delete receivePCs[data.id];
setUsers(users => users.filter(user => user.id !== data.id));
});
newSocket.on(
"getSenderAnswer",
async (data: { sdp: RTCSessionDescription }) => {
try {
await sendPC.setRemoteDescription(
new RTCSessionDescription(data.sdp)
);
} catch (error) {
console.log(error);
}
}
);
newSocket.on(
"getSenderCandidate",
async (data: { candidate: RTCIceCandidateInit }) => {
try {
if (!data.candidate) return;
sendPC.addIceCandidate(new RTCIceCandidate(data.candidate));
} catch (error) {
console.log(error);
}
}
);
newSocket.on(
"getReceiverAnswer",
async (data: { id: string; sdp: RTCSessionDescription }) => {
try {
let pc: RTCPeerConnection = receivePCs[data.id];
await pc.setRemoteDescription(data.sdp);
} catch (error) {
console.log(error);
}
}
);
newSocket.on(
"getReceiverCandidate",
async (data: { id: string; candidate: RTCIceCandidateInit }) => {
try {
let pc: RTCPeerConnection = receivePCs[data.id];
if (!data.candidate) return;
pc.addIceCandidate(new RTCIceCandidate(data.candidate));
} catch (error) {
console.log(error);
}
}
);
navigator.mediaDevices
.getUserMedia({
audio: true,
video: {
width: 240,
height: 240,
},
})
.then(stream => {
if (localVideoRef.current) localVideoRef.current.srcObject = stream;
localStream = stream;
sendPC = createSenderPeerConnection(newSocket, localStream);
createSenderOffer(newSocket);
newSocket.emit("joinRoom", {
id: newSocket.id,
roomID: "1234",
});
})
.catch(error => {
console.log(`getUserMedia error: ${error}`);
});
const createReceivePC = (id: string, newSocket: SocketIOClient.Socket) => {
try {
let pc = createReceiverPeerConnection(id, newSocket);
createReceiverOffer(pc, newSocket, id);
} catch (error) {
console.log(error);
}
};
Note
Since RTCPeerConnection is for sending your MediaStream, leave both offerToReceiveAudio and offerToReceiveVideo as false.
const createSenderOffer = async (newSocket: SocketIOClient.Socket) => {
try {
let sdp = await sendPC.createOffer({
offerToReceiveAudio: false,
offerToReceiveVideo: false,
});
await sendPC.setLocalDescription(new RTCSessionDescription(sdp));
newSocket.emit("senderOffer", {
sdp,
senderSocketID: newSocket.id,
roomID: "1234",
});
} catch (error) {
console.log(error);
}
};
Note
Since RTCPeerConnection is intended to receive MediaStream from other users, both offerToReceiveAudio and offerToReceiveVideo should be true.
const createReceiverOffer = async (
pc: RTCPeerConnection,
newSocket: SocketIOClient.Socket,
senderSocketID: string
) => {
try {
let sdp = await pc.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
});
await pc.setLocalDescription(new RTCSessionDescription(sdp));
newSocket.emit("receiverOffer", {
sdp,
receiverSocketID: newSocket.id,
senderSocketID,
roomID: "1234",
});
} catch (error) {
console.log(error);
}
};
const createSenderPeerConnection = (
newSocket: SocketIOClient.Socket,
localStream: MediaStream
): RTCPeerConnection => {
let pc = new RTCPeerConnection(pc_config);
pc.onicecandidate = e => {
if (e.candidate) {
newSocket.emit("senderCandidate", {
candidate: e.candidate,
senderSocketID: newSocket.id,
});
}
};
pc.oniceconnectionstatechange = e => {
console.log(e);
};
if (localStream) {
console.log("localstream add");
localStream.getTracks().forEach(track => {
pc.addTrack(track, localStream);
});
} else {
console.log("no local stream");
}
// return pc
return pc;
};
const createReceiverPeerConnection = (
socketID: string,
newSocket: SocketIOClient.Socket
): RTCPeerConnection => {
let pc = new RTCPeerConnection(pc_config);
// add pc to peerConnections object
receivePCs = { ...receivePCs, [socketID]: pc };
pc.onicecandidate = e => {
if (e.candidate) {
newSocket.emit("receiverCandidate", {
candidate: e.candidate,
receiverSocketID: newSocket.id,
senderSocketID: socketID,
});
}
};
pc.oniceconnectionstatechange = e => {
console.log(e);
};
pc.ontrack = e => {
setUsers(oldUsers => oldUsers.filter(user => user.id !== socketID));
setUsers(oldUsers => [
...oldUsers,
{
id: socketID,
stream: e.streams[0],
},
]);
};
// return pc
return pc;
};
interface IWebRTCUser {
id: string;
email: string;
stream: MediaStream;
}
interface Props {
email: string;
stream: MediaStream;
muted?: boolean;
}
const Video = ({ email, stream, muted }: Props) => {
const ref = useRef<HTMLVideoElement>(null);
const [isMuted, setIsMuted] = useState<boolean>(false);
useEffect(() => {
if (ref.current) ref.current.srcObject = stream;
if (muted) setIsMuted(muted);
});
return (
<Container>
<VideoContainer ref={ref} muted={isMuted} autoPlay></VideoContainer>
<UserLabel>{email}</UserLabel>
</Container>
);
};
return (
<div>
<video
style={{
width: 240,
height: 240,
margin: 5,
backgroundColor: "black",
}}
muted
ref={localVideoRef}
autoPlay
></video>
{users.map((user, index) => {
return (
<Video key={index} email={user.email} stream={user.stream} />
);
})}
</div>
);