Load environment variables
This quickstart is for meetings, but can easily be repurposed for webinars. Just replace the meeting scopes and events with their webinar equivalent. For example, replace
meeting.rtms_startedwithwebinar.rtms_started.
In this guide, you'll build a complete Realtime Media Streams (RTMS) application that programmatically controls RTMS sessions through REST API calls and receives live transcripts via WebSocket connections. Using REST API calls is useful when auto start for RTMS has not been enabled for an app by the user or account.
Skip to the bottom if you want the full code. You can also find it on GitHub, Node.js and Python.
The server will:
- Listen for incoming webhook events
meeting.started - Generate an access token
- Manually start RTMS using an API call
- Listen for incoming webhook
meeting.rtms_startedevents - Generate a signature for handshake requests
- Connect to the WebSocket endpoint for the meeting
- Receive meeting transcripts in real time
- Manually stop RTMS using an API call
Prerequisites
- Create an app in the Zoom Marketplace
- Add Realtime Media Streams features to the app.
Get started
First, create a new Node.js project and install express, dotenv, and ws as dependencies.
npm init -y
npm install express dotenv ws
Next, we'll create a basic server on localhost:3000. Create a new file named index.js and add the following code to it.
import express from "express";
import WebSocket from "ws";
import crypto from "crypto";
import dotenv from "dotenv";
import axios from "axios";
// Load environment variables from .env
dotenv.config();
const app = express();
// Enable JSON body parsing
app.use(express.json());
// Basic root route for testing
app.get("/", (req, res) => {
res.send("Zoom RTMS Server is up and running.");
});
// Listen on localhost:3000
const PORT = 3000;
app.listen(PORT, () => {
console.log(`Server is listening on http://localhost:${PORT}`);
});
First, create a new Python project and install Flask, python-dotenv, and websockets as dependencies.
Create a requirements.txt file and add the following.
Flask
python-dotenv
websockets
Install the dependencies
pip install -r requirements.txt
Next, we'll create a basic server on localhost:3000. Create a new file named main.py and add the following code to it.
from flask import Flask, request, jsonify
import websocket
import json
import hashlib
import hmac
import os
import requests
import threading
import time
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
app = Flask(__name__)
@app.route('/')
def home():
return 'Zoom RTMS Server is up and running.'
if __name__ == '__main__':
app.run(host='localhost', port=3000, debug=True)
Setup environment variables
Create a .env file in your project root and add the following.
.env sample
ZOOM_CLIENT_ID=your_client_id
ZOOM_CLIENT_SECRET=your_client_secret
PORT=3000
Get your ZOOM_CLIENT_ID and ZOOM_CLIENT_SECRET from a Zoom app with Realtime Media Streams features (webhook subscriptions and scopes are required).
Build the webhook receiver
When a meeting starts, if your app has subscribed to the meeting.started webhook event, your app will receive an event when a meeting is started.
meeting.started sample
{
"event": "meeting.started",
"event_ts": 1732313171881,
"payload": {
"object": {
"id": 123456789,
"uuid": "xxxxxxxxxx",
"host_id": "xxxxxxxxxx",
"topic": "Meeting Topic"
}
}
}
When we receive the meeting.started event, we extract the meeting details from the webhook event to start RTMS.
To handle these webhook events, we'll build a webhook receiver in index.js.
app.post("/webhook", async (req, res) => {
const { event, payload } = req.body;
console.log("Webhook received:", event);
console.log("Payload:", JSON.stringify(payload, null, 2));
// Handle meeting started event
if (event === "meeting.started") {
console.log("Meeting started, initiating RTMS...");
const { object } = payload;
const meetingId = object.id;
const meetingUuid = object.uuid;
try {
// Get access token
const accessToken = generateAccessToken();
// Make API call to start RTMS
await startRTMS(meetingId, accessToken);
console.log(`RTMS started for meeting ${meetingId}`);
// Schedule automatic RTMS stop after 10 seconds
scheduleRTMSStop(meetingId, accessToken);
} catch (error) {
console.error("Error starting RTMS:", error);
}
}
res.sendStatus(200);
});
To handle these webhook events, we'll build a webhook receiver in main.py.
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.get_json()
event = data.get('event')
payload = data.get('payload')
print(f'Webhook received: {event}')
print(f'Payload: {json.dumps(payload, indent=2)}')
# Handle meeting started event
if event == 'meeting.started':
print('Meeting started, initiating RTMS...')
meeting_object = payload.get('object')
meeting_id = meeting_object.get('id')
meeting_uuid = meeting_object.get('uuid')
try:
# Get access token
access_token = generate_access_token()
# Make API call to start RTMS
start_rtms(meeting_id, access_token)
print(f'RTMS started for meeting {meeting_id}')
# Schedule automatic RTMS stop after 10 seconds
schedule_rtms_stop(meeting_id, access_token)
except Exception as error:
print(f'Error starting RTMS: {error}')
return '', 200
Handle access token
In order to call the API to start RTMS for a meeting, we need to first generate an access token. Make sure that you have selected the following granular scopes meeting:update:participant_rtms_app_status and meeting:update:participant_rtms_app_status:admin to call the API.
Let's create a helper function to retrieve the access token from environment variables.
Add the following code to index.js.
// Generate access token from environment variable
function generateAccessToken() {
const accessToken = process.env.access_token;
if (!accessToken) {
throw new Error("access_token not found in environment variables");
}
console.log("Using access token from environment variables");
return accessToken;
}
Add the following code to main.py.
def generate_access_token():
access_token = os.getenv('access_token')
if not access_token:
raise Exception('access_token not found in environment variables')
print('Using access token from environment variables')
return access_token
For production applications, you should implement proper OAuth flow with refresh tokens. For more information, see OAuth 2.0.
Manually start RTMS
Next, the app needs to manually start RTMS to receive data from the meeting.
Add the following code to index.js.
// Manually start RTMS using Zoom API
async function startRTMS(meetingId, accessToken) {
try {
const response = await axios.patch(
`https://api.zoom.us/v2/live_meetings/${meetingId}/rtms_app/status`,
{
action: "start",
settings: {
client_id: process.env.ZOOM_CLIENT_ID,
},
},
{
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${accessToken}`,
},
},
);
console.log("RTMS start response:", response.data);
return response.data;
} catch (error) {
console.error(
"Error starting RTMS via API:",
error.response?.data || error.message,
);
throw error;
}
}
Add the following code to main.py.
def start_rtms(meeting_id, access_token):
try:
url = f"https://api.zoom.us/v2/live_meetings/{meeting_id}/rtms_app/status"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
data = {
'action': 'start',
'settings': {
'client_id': os.getenv('ZOOM_CLIENT_ID')
}
}
response = requests.patch(url, headers=headers, json=data)
print(f'RTMS start response: {response.json()}')
return response.json()
except Exception as error:
print(f'Error starting RTMS via API: {error}')
raise error
Receive RTMS event
When RTMS starts successfully, Zoom will send a meeting.rtms_started event.
meeting.rtms_started sample
{
"event": "meeting.rtms_started",
"event_ts": 1626230691572,
"payload": {
"meeting_uuid": "xxxxxxxxxx",
"meeting_id": "xxxxxxxxxx",
"account_id": "xxxxxxxxxx", // user account ID
"operator_id": "xxxxxxxxxx",
"is_original_host": true,
"rtms_stream_id": "xxxxxxxxxx",
"server_urls": "wss://127.0.0.1:443"
}
}
Your app will need to handle this event.
Add the following code to index.js.
// Handle RTMS start event
if (event === "meeting.rtms_started") {
const { meeting_uuid, rtms_stream_id, server_urls } = payload;
console.log(`Starting RTMS for meeting ${meeting_uuid}`);
// Connect to signaling WebSocket to establish RTMS connection
connectToSignalingWebSocket(meeting_uuid, rtms_stream_id, server_urls);
}
Add the following code to main.py.
# Handle RTMS start event
if event == 'meeting.rtms_started':
meeting_uuid = payload.get('meeting_uuid')
rtms_stream_id = payload.get('rtms_stream_id')
server_urls = payload.get('server_urls')
print(f'Starting RTMS for meeting {meeting_uuid}')
# Connect to signaling WebSocket to establish RTMS connection
connect_to_signaling_websocket(meeting_uuid, rtms_stream_id, server_urls)
Create the signature generator
Next, we'll create a function to generate the signature for the signaling WebSocket connection using HMAC SHA256. This will be used to authenticate the handshake request to the signaling server.
Add the following code to index.js.
import crypto from "crypto";
function generateSignature(meetingUuid, rtmsStreamId) {
const message = `${process.env.ZOOM_CLIENT_ID},${meetingUuid},${rtmsStreamId}`;
const signature = crypto
.createHmac("sha256", process.env.ZOOM_CLIENT_SECRET)
.update(message)
.digest("hex");
console.log(`Generated signature: ${signature}`);
return signature;
}
Add the following code to main.py.
def generate_signature(meeting_uuid, rtms_stream_id):
message = f"{os.getenv('ZOOM_CLIENT_ID')},{meeting_uuid},{rtms_stream_id}"
signature = hmac.new(
os.getenv('ZOOM_CLIENT_SECRET').encode(),
message.encode(),
hashlib.sha256
).hexdigest()
print(f'Generated signature: {signature}')
return signature
This helper returns the computed signature string.
Connect to the signaling server with WebSockets
Next, we'll use the signature inside a connectToSignalingWebSocket() function to establish the signaling connection. The signaling handshake request passes in the meeting_uuid and rtms_stream_id from the meeting.rtms_started event and includes fields like msg_type, protocol_version and sequence as required.
Add the following code to index.js.
function connectToSignalingWebSocket(meetingUuid, rtmsStreamId, serverUrls) {
const signalingWs = new WebSocket(serverUrls);
signalingWs.on("open", () => {
console.log(`Signaling WebSocket opened for meeting ${meetingUuid}`);
const signature = generateSignature(meetingUuid, rtmsStreamId);
const handshakeMsg = {
msg_type: 1, // SIGNALING_HAND_SHAKE_REQ
meeting_uuid: meetingUuid,
rtms_stream_id: rtmsStreamId,
signature,
};
console.log("Sending handshake message:", handshakeMsg);
signalingWs.send(JSON.stringify(handshakeMsg));
});
signalingWs.on("error", (error) => {
console.error("Signaling WebSocket error:", error);
});
signalingWs.on("close", (code, reason) => {
console.log("Signaling WebSocket closed:", code, reason);
});
}
Add the following code to main.py.
def connect_to_signaling_websocket(meeting_uuid, rtms_stream_id, server_urls):
def on_open(ws):
print(f'Signaling WebSocket opened for meeting {meeting_uuid}')
signature = generate_signature(meeting_uuid, rtms_stream_id)
handshake_msg = {
'msg_type': 1, # SIGNALING_HAND_SHAKE_REQ
'meeting_uuid': meeting_uuid,
'rtms_stream_id': rtms_stream_id,
'signature': signature
}
print(f'Sending handshake message: {handshake_msg}')
ws.send(json.dumps(handshake_msg))
signaling_ws = websocket.WebSocketApp(server_urls, on_open=on_open)
# Start WebSocket in a separate thread
threading.Thread(target=signaling_ws.run_forever).start()
This function sends the signature and required handshake fields to the signaling server to authorize the connection.
Handling keep-alive requests
When the signaling WebSocket connection is active, the RTMS server periodically sends keep-alive messages to check if the client is still connected. The client needs to respond promptly with a keep-alive response message, including the timestamp received in the request, to maintain the WebSocket connection.
Add the following code to index.js.
if (msg.msg_type === 12) {
// KEEP_ALIVE_REQ
console.log("Received KEEP_ALIVE_REQ, responding with KEEP_ALIVE_RESP");
signalingWs.send(
JSON.stringify({
msg_type: 13, // KEEP_ALIVE_RESP
timestamp: msg.timestamp,
}),
);
}
Add the following code to main.py.
if msg.get('msg_type') == 12: # KEEP_ALIVE_REQ
print('Received KEEP_ALIVE_REQ, responding with KEEP_ALIVE_ACK')
await signaling_ws.send(json.dumps({
'msg_type': 13, # KEEP_ALIVE_ACK
'timestamp': msg.get('timestamp')
}))
Connect to the media server with a WebSocket
When the signaling handshake is successful, the RTMS signaling server sends a handshake response with media server URLs in media_server.server_urls:
Signaling handshake response sample
{
"msg_type": 2,
"protocol_version": 1,
"sequence": 0,
"status_code": 0,
"reason": "",
"media_server": {
"server_urls": {
"audio": "wss://...",
"video": "wss://...",
"transcript": "wss://...",
"all": "wss://..."
}
}
}
Next, our app will need to open one of the media URLs to open a WebSocket connection to receive media data. In this example, we will request the transcript stream, which uses media_type: 8.
When the Media WebSocket connection opens, we build and send a handshake request to the media server with our meeting details and signature.
Add the following code to index.js.
function connectToMediaWebSocket(
mediaUrl,
meetingUuid,
rtmsStreamId,
signalingSocket,
) {
// Open the media WebSocket connection using the URL from the handshake response
const mediaWs = new WebSocket(mediaUrl);
mediaWs.on("open", () => {
// Build the media handshake for transcripts only
const handshakeMsg = {
msg_type: 3, // DATA_HAND_SHAKE_REQ
protocol_version: 1,
sequence: 0,
meeting_uuid: meetingUuid,
rtms_stream_id: rtmsStreamId,
signature: generateSignature(meetingUuid, rtmsStreamId),
media_type: 8, // Request only transcripts (TRANSCRIPT enum)
};
console.log("Sending transcript handshake:", handshakeMsg);
mediaWs.send(JSON.stringify(handshakeMsg));
});
// Listen for incoming transcript data packets
mediaWs.on("message", (data) => {
console.log("Received transcript data:", data);
});
}
Add the following code to main.py.
async def connect_to_media_websocket(media_url, meeting_uuid, stream_id, signaling_socket):
async with websockets.connect(media_url) as media_ws:
# Build the media handshake for transcripts only
handshake_msg = {
'msg_type': 3, # DATA_HAND_SHAKE_REQ
'protocol_version': 1,
'sequence': 0,
'meeting_uuid': meeting_uuid,
'rtms_stream_id': stream_id,
'signature': generate_signature(meeting_uuid, stream_id),
'media_type': 8 # Request only transcripts (TRANSCRIPT enum)
}
print('Sending transcript handshake:', handshake_msg)
await media_ws.send(json.dumps(handshake_msg))
# Listen for incoming transcript data packets
async for message in media_ws:
print('Received transcript data:', message)
After a successful handshake to the media server, the RTMS server responds with a media handshake response:
Client ready acknowledgement (ACK) sample
{
"msg_type": 4,
"protocol_version": 1,
"status_code": 0,
"reason": "",
"sequence": 0,
"payload_encrypted": true,
"media_params": {
"transcript": {
"content_type": 5
}
}
}
Send the client ready acknowledgement (ACK)
To verify our app is ready to receive media, we send a client ready ACK message back to the signaling WebSocket. This tells the RTMS server our client is ready to receive a stream on the media server:
Add the following code to index.js.
// If handshake response is OK, send CLIENT_READY_ACK on signaling socket
if (msg.msg_type === 4 && msg.status_code === 0) {
console.log(
"Media handshake successful, sending CLIENT_READY_ACK via signaling socket",
);
signalingSocket.send(
JSON.stringify({
msg_type: 7, // CLIENT_READY_ACK
rtms_stream_id: rtmsStreamId,
}),
);
}
Add the following code to main.py.
# If handshake response is OK, send CLIENT_READY_ACK on signaling socket
if msg.get('msg_type') == 4 and msg.get('status_code') == 0:
print('Media handshake successful, sending CLIENT_READY_ACK via signaling socket')
await signaling_socket.send(json.dumps({
'msg_type': 7, # CLIENT_READY_ACK
'rtms_stream_id': rtms_stream_id
}))
Receive media data
Once the CLIENT_READY_ACK is sent, the RTMS server will begin streaming the actual media data, in our case transcripts, through the media WebSocket.
Incoming media packets have different msg_type values depending on the type of media you requested in your DATA_HAND_SHAKE_REQ.
For transcripts, each chunk arrives as a message with msg_type 17.
When the media WebSocket is active and the stream has started, you need to handle incoming packets.
Add the following code to index.js.
// When receiving a MEDIA_DATA_TRANSCRIPT message
if (msg.msg_type === 17) {
console.log("Received transcript:", msg.content);
}
Add the following code to main.py.
# When receiving a MEDIA_DATA_TRANSCRIPT message
if msg.get('msg_type') == 17:
print('Received transcript:', msg.get('content'))
Send a keep-alive message to the media WebSocket
Similar to the signaling connection, we also need to keep the media connection alive. We will use the same logic.
Add the following code to index.js.
if (msg.msg_type === 12) {
// KEEP_ALIVE_REQ
console.log("Received KEEP_ALIVE_REQ, responding with KEEP_ALIVE_ACK");
mediaWs.send(
JSON.stringify({
msg_type: 13, // KEEP_ALIVE_ACK
timestamp: msg.timestamp,
}),
);
}
Add the following code to main.py.
if msg.get('msg_type') == 12: # KEEP_ALIVE_REQ
print('Received KEEP_ALIVE_REQ, responding with KEEP_ALIVE_ACK')
await media_ws.send(json.dumps({
'msg_type': 13, # KEEP_ALIVE_ACK
'timestamp': msg.get('timestamp')
}))
Stop RTMS
RTMS will automatically stop if the meeting ends, or the host disables RTMS.
Add the following code to index.js.
// Stop RTMS using Zoom API
async function stopRTMS(meetingId, accessToken) {
try {
const response = await axios.patch(
`https://api.zoom.us/v2/live_meetings/${meetingId}/rtms_app/status`,
{
action: "stop",
settings: {
client_id: process.env.ZOOM_CLIENT_ID,
},
},
{
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${accessToken}`,
},
},
);
console.log("RTMS stop response:", response.data);
return response.data;
} catch (error) {
console.error(
"Error stopping RTMS via API:",
error.response?.data || error.message,
);
throw error;
}
}
Add the following code to main.py.
# Stop RTMS using Zoom API
def stop_rtms(meeting_id, access_token):
try:
response = requests.patch(
f"https://api.zoom.us/v2/live_meetings/{meeting_id}/rtms_app/status",
json={
'action': 'stop',
'settings': {
'client_id': os.getenv('ZOOM_CLIENT_ID')
}
},
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
)
print('RTMS stop response:', response.json())
return response.json()
except requests.exceptions.RequestException as error:
print('Error stopping RTMS via API:', error)
raise error
Stop RTMS automatically
We can also build a function to stop RTMS after a certain amount of time.
Add the following code to index.js.
// Schedule RTMS stop after 10 seconds
function scheduleRTMSStop(meetingId, accessToken) {
console.log(
`Scheduling RTMS stop for meeting ${meetingId} in 10 seconds...`,
);
setTimeout(async () => {
try {
console.log(`Stopping RTMS for meeting ${meetingId}...`);
await stopRTMS(meetingId, accessToken);
console.log(`RTMS stopped successfully for meeting ${meetingId}`);
} catch (error) {
console.error(
`Failed to stop RTMS for meeting ${meetingId}:`,
error,
);
}
}, 10000); // 10 seconds
}
Add the following code to main.py.
# Schedule RTMS stop after 10 seconds
def schedule_rtms_stop(meeting_id, access_token):
print(f'Scheduling RTMS stop for meeting {meeting_id} in 10 seconds...')
def stop_after_delay():
time.sleep(10) # 10 seconds
try:
print(f'Stopping RTMS for meeting {meeting_id}...')
stop_rtms(meeting_id, access_token)
print(f'RTMS stopped successfully for meeting {meeting_id}')
except Exception as error:
print(f'Failed to stop RTMS for meeting {meeting_id}: {error}')
thread = threading.Thread(target=stop_after_delay)
thread.daemon = True
thread.start()
Handle RTMS stopped events
When RTMS stops, Zoom will send you the meeting.rtms_stopped payload.
meeting.rtms_stopped sample
{
"event": "meeting.rtms_stopped",
"event_ts": 1732313171881,
"payload": {
"meeting_uuid": "xxxxxxxxxx",
"rtms_stream_id": "xxxxxxxxxxc",
"stop_reason": 6
}
}
Your app needs to handle these events.
Add the following code to index.js.
if (event === "meeting.rtms_stopped") {
const { meeting_uuid } = payload;
console.log(`Stopping RTMS for meeting ${meeting_uuid}`);
//Your logic after RTMS stops goes here
}
Add the following code to main.py.
if event == 'meeting.rtms_stopped':
meeting_uuid = payload.get('meeting_uuid')
print(f'Meeting {meeting_uuid} stopped')
#Your logic after RTMS stops goes here
Full code
Here's the final code for our server.
import express from "express";
import WebSocket from "ws";
import crypto from "crypto";
import dotenv from "dotenv";
import axios from "axios";
// Load environment variables from .env
dotenv.config();
const app = express();
app.use(express.json()); // Parse incoming JSON payloads
// Step 1: Webhook Receiver - Listen for meeting events
app.post("/webhook", async (req, res) => {
const { event, payload } = req.body;
console.log("Webhook received:", event);
console.log("Payload:", JSON.stringify(payload, null, 2));
// Step 2a: Listen to meeting started event
if (event === "meeting.started") {
console.log("Meeting started, initiating RTMS...");
const { object } = payload;
const meetingId = object.id;
const meetingUuid = object.uuid;
try {
// Step 2b: Get access token
const accessToken = generateAccessToken();
// Step 2c: Make API call to start RTMS
await startRTMS(meetingId, accessToken);
console.log(`RTMS started for meeting ${meetingId}`);
// Schedule automatic RTMS stop after 60 seconds
scheduleRTMSStop(meetingId, accessToken);
} catch (error) {
console.error("Error starting RTMS:", error);
}
}
// Step 3: RTMS started event
if (event === "meeting.rtms_started") {
console.log("Starting RTMS connection...");
const { meeting_uuid, rtms_stream_id, server_urls } = payload;
connectToSignalingWebSocket(meeting_uuid, rtms_stream_id, server_urls);
}
// When meeting RTMS stops, log the stop event
if (event === "meeting.rtms_stopped") {
const { meeting_uuid } = payload;
console.log(`Meeting ${meeting_uuid} stopped`);
// Open WebSocket connections will close naturally
}
res.sendStatus(200); // Acknowledge webhook receipt
});
// Step 4: Generate Signature for authentication handshake
function generateSignature(meetingUuid, streamId) {
const message = `${process.env.ZOOM_CLIENT_ID},${meetingUuid},${streamId}`;
return crypto
.createHmac("sha256", process.env.ZOOM_CLIENT_SECRET)
.update(message)
.digest("hex");
}
// Helper function: Generate access token from environment variable
// Your logic to generate accesstoken will be here. Please visit https://developers.zoom.us/docs/integrations/oauth/ to know more.
function generateAccessToken() {
const accessToken = process.env.access_token;
if (!accessToken) {
throw new Error("access_token not found in environment variables");
}
console.log("Using access token from environment variables");
return accessToken;
}
// Helper function: Manually start RTMS using Zoom API
async function startRTMS(meetingId, accessToken) {
try {
const response = await axios.patch(
`https://api.zoom.us/v2/live_meetings/${meetingId}/rtms_app/status`,
{
action: "start",
settings: {
client_id: process.env.ZOOM_CLIENT_ID,
},
},
{
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${accessToken}`,
},
},
);
console.log("RTMS start response:", response.data);
return response.data;
} catch (error) {
console.error(
"Error starting RTMS via API:",
error.response?.data || error.message,
);
throw error;
}
}
// Step 5: WebSocket 1 - Connect to Signaling WebSocket
function connectToSignalingWebSocket(meetingUuid, streamId, serverUrls) {
console.log(`Connecting to signaling WebSocket: ${serverUrls}`);
const signalingWs = new WebSocket(serverUrls);
// Once signaling WebSocket is open, send handshake
signalingWs.on("open", () => {
console.log("Signaling WebSocket opened");
signalingWs.send(
JSON.stringify({
msg_type: 1, // HANDSHAKE_REQUEST
meeting_uuid: meetingUuid,
rtms_stream_id: streamId,
signature: generateSignature(meetingUuid, streamId),
}),
);
});
// Handle incoming signaling messages
signalingWs.on("message", (data) => {
const msg = JSON.parse(data);
console.log("Signaling message:", msg);
// If handshake is successful, proceed to media connection
if (msg.msg_type === 2 && msg.status_code === 0) {
const mediaUrl = msg.media_server.server_urls.transcript;
connectToMediaWebSocket(
mediaUrl,
meetingUuid,
streamId,
signalingWs,
);
}
// If keep-alive request is received, respond with ACK
if (msg.msg_type === 12) {
signalingWs.send(
JSON.stringify({
msg_type: 13,
timestamp: msg.timestamp,
}),
);
}
});
// Log signaling errors
signalingWs.on("error", (error) => {
console.error("Signaling WebSocket error:", error);
});
// Log signaling WebSocket closure
signalingWs.on("close", () => {
console.log("Signaling WebSocket closed");
});
}
// Step 6: WebSocket 2 - Connect to Media WebSocket
function connectToMediaWebSocket(
mediaUrl,
meetingUuid,
streamId,
signalingSocket,
) {
console.log(`Connecting to media WebSocket: ${mediaUrl}`);
const mediaWs = new WebSocket(mediaUrl);
// Once media WebSocket is open, send media handshake
mediaWs.on("open", () => {
console.log("Media WebSocket opened");
mediaWs.send(
JSON.stringify({
msg_type: 3, // MEDIA_HANDSHAKE_REQUEST
protocol_version: 1,
sequence: 0,
meeting_uuid: meetingUuid,
rtms_stream_id: streamId,
signature: generateSignature(meetingUuid, streamId),
media_type: 8, // Request transcript stream
}),
);
});
// Handle incoming media messages
mediaWs.on("message", (data) => {
const msg = JSON.parse(data);
console.log("Media message received:", msg);
// Respond to keep-alive request from media server
if (msg.msg_type === 12) {
console.log("Responding to media keep-alive");
mediaWs.send(
JSON.stringify({
msg_type: 13,
timestamp: msg.timestamp,
}),
);
}
// If media handshake is successful, notify signaling server that client is ready
if (msg.msg_type === 4 && msg.status_code === 0) {
console.log("Media handshake successful, sending CLIENT_READY_ACK");
signalingSocket.send(
JSON.stringify({
msg_type: 7,
rtms_stream_id: streamId,
}),
);
}
// Log incoming transcript data
if (msg.msg_type === 17) {
console.log("Transcript:", msg);
}
});
// Log media errors
mediaWs.on("error", (error) => {
console.error("Media WebSocket error:", error);
});
// Log media WebSocket closure
mediaWs.on("close", () => {
console.log("Media WebSocket closed");
});
}
// Step 7: Stop RTMS using Zoom API
async function stopRTMS(meetingId, accessToken) {
try {
const response = await axios.patch(
`https://api.zoom.us/v2/live_meetings/${meetingId}/rtms_app/status`,
{
action: "stop",
settings: {
client_id: process.env.ZOOM_CLIENT_ID,
},
},
{
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${accessToken}`,
},
},
);
console.log("RTMS stop response:", response.data);
return response.data;
} catch (error) {
console.error(
"Error stopping RTMS via API:",
error.response?.data || error.message,
);
throw error;
}
}
// Helper function: Schedule RTMS stop after 10 seconds
// Your logic to stop RTMS will go here
function scheduleRTMSStop(meetingId, accessToken) {
console.log(
`Scheduling RTMS stop for meeting ${meetingId} in 10 seconds...`,
);
setTimeout(async () => {
try {
console.log(`Stopping RTMS for meeting ${meetingId}...`);
await stopRTMS(meetingId, accessToken);
console.log(`RTMS stopped successfully for meeting ${meetingId}`);
} catch (error) {
console.error(
`Failed to stop RTMS for meeting ${meetingId}:`,
error,
);
}
}, 10000); // 10 seconds
}
// Step 7: Start Express server on port 3000
app.listen(3000, () => console.log("Server running on port 3000"));
import os
import json
import asyncio
import websockets
import requests
import hmac
import hashlib
from flask import Flask, request, jsonify
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
app = Flask(__name__)
# Step 1: Webhook Receiver - Listen for meeting events
@app.route("/webhook", methods=['POST'])
def webhook():
data = request.get_json()
event = data.get('event')
payload = data.get('payload')
print(f'Webhook received: {event}')
print(f'Payload: {json.dumps(payload, indent=2)}')
# Step 2a: Listen to meeting started event
if event == 'meeting.started':
print('Meeting started, initiating RTMS...')
meeting_object = payload.get('object', {})
meeting_id = meeting_object.get('id')
meeting_uuid = meeting_object.get('uuid')
try:
# Step 2b: Get access token
access_token = generate_access_token()
# Step 2c: Make API call to start RTMS
start_rtms(meeting_id, access_token)
print(f'RTMS started for meeting {meeting_id}')
# Schedule automatic RTMS stop after 60 seconds
schedule_rtms_stop(meeting_id, access_token)
except Exception as error:
print(f'Error starting RTMS: {error}')
# Step 3: RTMS started event
if event == 'meeting.rtms_started':
print('Starting RTMS connection...')
meeting_uuid = payload.get('meeting_uuid')
rtms_stream_id = payload.get('rtms_stream_id')
server_urls = payload.get('server_urls')
asyncio.run(connect_to_signaling_websocket(meeting_uuid, rtms_stream_id, server_urls))
# When meeting RTMS stops, log the stop event
if event == 'meeting.rtms_stopped':
meeting_uuid = payload.get('meeting_uuid')
print(f'Meeting {meeting_uuid} stopped')
# Open WebSocket connections will close naturally
return jsonify({'status': 'success'}), 200
# Step 4: Generate Signature for authentication handshake
def generate_signature(meeting_uuid, stream_id):
client_id = os.getenv('ZOOM_CLIENT_ID')
client_secret = os.getenv('ZOOM_CLIENT_SECRET')
if not client_id or not client_secret:
raise Exception('ZOOM_CLIENT_ID and ZOOM_CLIENT_SECRET must be set in environment variables')
message = f"{client_id},{meeting_uuid},{stream_id}"
signature = hmac.new(
client_secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
# Helper function: Generate access token from environment variable
def generate_access_token():
access_token = os.getenv('access_token')
if not access_token:
raise Exception('access_token not found in environment variables')
print('Using access token from environment variables')
return access_token
# Helper function: Manually start RTMS using Zoom API
def start_rtms(meeting_id, access_token):
try:
url = f"https://api.zoom.us/v2/live_meetings/{meeting_id}/rtms_app/status"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
data = {
'action': 'start',
'settings': {
'client_id': os.getenv('ZOOM_CLIENT_ID')
}
}
response = requests.patch(url, json=data, headers=headers)
# Better error handling
if response.status_code != 200:
print(f'API Error: Status {response.status_code}')
print(f'Response: {response.text}')
raise Exception(f'API call failed with status {response.status_code}')
try:
result = response.json()
print(f'RTMS start response: {result}')
return result
except json.JSONDecodeError:
print(f'Invalid JSON response: {response.text}')
raise Exception('Invalid JSON response from API')
except requests.exceptions.RequestException as error:
print(f'Error starting RTMS via API: {error}')
raise error
# Step 5: WebSocket 1 - Connect to Signaling WebSocket
async def connect_to_signaling_websocket(meeting_uuid, stream_id, server_urls):
print(f'Connecting to signaling WebSocket: {server_urls}')
try:
# Disable SSL certificate verification for development
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with websockets.connect(server_urls, ssl=ssl_context) as signaling_ws:
print('Signaling WebSocket opened')
# Send handshake
handshake_message = {
'msg_type': 1, # HANDSHAKE_REQUEST
'meeting_uuid': meeting_uuid,
'rtms_stream_id': stream_id,
'signature': generate_signature(meeting_uuid, stream_id)
}
await signaling_ws.send(json.dumps(handshake_message))
# Handle incoming signaling messages
async for message in signaling_ws:
msg = json.loads(message)
print(f'Signaling message: {msg}')
# If handshake is successful, proceed to media connection
if msg.get('msg_type') == 2 and msg.get('status_code') == 0:
media_url = msg['media_server']['server_urls']['transcript']
await connect_to_media_websocket(media_url, meeting_uuid, stream_id, signaling_ws)
# If keep-alive request is received, respond with ACK
if msg.get('msg_type') == 12:
ack_message = {
'msg_type': 13,
'timestamp': msg.get('timestamp')
}
await signaling_ws.send(json.dumps(ack_message))
except Exception as error:
print(f'Signaling WebSocket error: {error}')
finally:
print('Signaling WebSocket closed')
# Step 6: WebSocket 2 - Connect to Media WebSocket
async def connect_to_media_websocket(media_url, meeting_uuid, stream_id, signaling_socket):
print(f'Connecting to media WebSocket: {media_url}')
try:
# Disable SSL certificate verification for development
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with websockets.connect(media_url, ssl=ssl_context) as media_ws:
print('Media WebSocket opened')
# Send media handshake
media_handshake = {
'msg_type': 3, # MEDIA_HANDSHAKE_REQUEST
'protocol_version': 1,
'sequence': 0,
'meeting_uuid': meeting_uuid,
'rtms_stream_id': stream_id,
'signature': generate_signature(meeting_uuid, stream_id),
'media_type': 8 # Request transcript stream
}
await media_ws.send(json.dumps(media_handshake))
# Handle incoming media messages
async for message in media_ws:
msg = json.loads(message)
print(f'Media message received: {msg}')
# Respond to keep-alive request from media server
if msg.get('msg_type') == 12:
print('Responding to media keep-alive')
ack_message = {
'msg_type': 13,
'timestamp': msg.get('timestamp')
}
await media_ws.send(json.dumps(ack_message))
# If media handshake is successful, notify signaling server that client is ready
if msg.get('msg_type') == 4 and msg.get('status_code') == 0:
print('Media handshake successful, sending CLIENT_READY_ACK')
ready_message = {
'msg_type': 7,
'rtms_stream_id': stream_id
}
await signaling_socket.send(json.dumps(ready_message))
# Log incoming transcript data
if msg.get('msg_type') == 5:
print(f'Transcript: {msg}')
except Exception as error:
print(f'Media WebSocket error: {error}')
finally:
print('Media WebSocket closed')
# Step 7: Stop RTMS using Zoom API
def stop_rtms(meeting_id, access_token):
try:
url = f"https://api.zoom.us/v2/live_meetings/{meeting_id}/rtms_app/status"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
data = {
'action': 'stop',
'settings': {
'client_id': os.getenv('ZOOM_CLIENT_ID')
}
}
response = requests.patch(url, json=data, headers=headers)
response.raise_for_status()
print(f'RTMS stop response: {response.json()}')
return response.json()
except requests.exceptions.RequestException as error:
print(f'Error stopping RTMS via API: {error}')
raise error
# Helper function: Schedule RTMS stop after 10 seconds
def schedule_rtms_stop(meeting_id, access_token):
print(f'Scheduling RTMS stop for meeting {meeting_id} in 10 seconds...')
def stop_after_delay():
import threading
import time
def delayed_stop():
time.sleep(10) # 10 seconds delay
try:
print(f'Stopping RTMS for meeting {meeting_id}...')
stop_rtms(meeting_id, access_token)
print(f'RTMS stopped successfully for meeting {meeting_id}')
except Exception as error:
print(f'Failed to stop RTMS for meeting {meeting_id}: {error}')
thread = threading.Thread(target=delayed_stop)
thread.daemon = True
thread.start()
stop_after_delay()
# Start Flask server on port 3000
if __name__ == '__main__':
print('Server running on port 3000')
app.run(host='0.0.0.0', port=3000, debug=True)