Video encoding using FFmpeg & node.js with an active queue

By.

min read

Introduction

All of the videos we watch on our computers and mobiles have gone through an encoding process that converts the original video so that it is viewable on various output formats like mp4,MOV,webm, etc… Today we focus on these encoding techniques through FFmpeg.Here am using nodeJs.

Iam working on a project named “slaxn” is a video sharing website going to be live in couple of days. Coming to the topic video encoding, I used FFmpeg -a complete, cross-platform solution to record, convert and stream audio and video.And a queing system for encoding jobs it is done with kue, Kue is a priority job queue backed by redis, built for node.js.

video conversion using node js

Prerequisites

  • Node js
  • FFmpeg
  • Redis
  • Socket.io
  • kue-npm

Setting up the environment

We need an app to configure all in one, I am using express and am not explaining here how to set up app on nodejs thats you already know.As i mentioned in prerequisites we need FFmpeg. For this do the following.

Installing FFmpeg

You guys can download it from their official site www.ffmpeg.org. I am using ffmpeg-4.2.2 version,the latest. It is available for all type platforms.Today iam showing it on linux.You can retrieve the source code through Git by using this command:

git clone https://git.ffmpeg.org/ffmpeg.git ffmpeg

or else this is the link to download it https://ffmpeg.org/releases/ffmpeg-4.2.2.tar.bz2. Its a tar file you can extract with any kind.

Setting up node.js

I am using node v10.15.2. Any one can download it from there site and is absolutly free https://nodejs.org/en/.For linux platform we have

curl -sL https://deb.nodesource.com/setup_10.x | sudo -E bash - 
sudo apt-get install -y nodejs

Open up your terminal and paste this code and hit it.It takes little time to install wait for it.After this you can check the installed version using this command.

node -v

output will be like this v10.16.0.Apart from node npm also gets installed npm-Node package-manager.

Queueing the video

For this we need kue-npm, it gives a fine ui on the job states. The kue consider the video as jobs .We have 4 job states inactive,active,completed and failed,it will be dicussed later.

job states in kue
fig 1.1 job states

we need to install the kue, so using node package manager,

npm i kue
npm i kue-ui

ok let’s code this,

export const encodingQueue = async (media: any, jwtToken: string) => {
    const job = queue.create("encoding", {
        media,
        title: media.slug,
        jwtToken,
    }).save((err: any) => {
        if (!err) { console.log("Job Id: ", job.id); }
    });
    job.on("complete", () => {
        console.log("Job completed");
    }).on("error", (err) => {
        console.log("Oops... ", err);
        job.remove(function (err) {
            console.log('removed job #%d', job.id);
        });
        job.failed().error(err);
    }).on("failed", (errorMessage) => {
        console.log("Job failed", errorMessage);
    }).on("progress", (progress, state) => {
        console.log("\r  job #" + job.id + " " + progress + "% complete with data ", state);
        global["io"].emit(`${media.username}-${media.channelName}-${media.slug}`, { progress, state });
    });
};

This function contains queue creation,handlers for job fails,completes,and consoling and emiting progress to view.

export const encodingQueue = async (media: any, jwtToken: string) 

Here the media is the video file and jwt is my security.

const job = queue.create("encoding", {
media,
title: media.slug,
jwtToken,
}).save

here job is created with name “encoding” and have title as video name. Before this we need a storage for these jobs and here is the use of Redis. Redis is an open source (BSD licensed), in-memory data structure store, used as a database.So let install redis

npm i redis

This is only a package installation,you need to install it in some other means. Here is the link to install redis on ubuntu. https://www.digitalocean.com/community/tutorials/how-to-install-and-secure-redis-on-ubuntu-18-04.

If you want a gui for redis use this Redis Desktop Manager, “https://redisdesktop.com/“, you can see the 16 database and their states.

Ok then we need to set up a connection to redis

var kue = require('kue');            //requiring kue package
var queue = kue.createQueue({        //creating the queue
  prefix: 'queue',
  redis: {
    port: 6379,
    host: '127.0.0.1',
    options: {}

6379 is the port and localhost is the host by default, we can authenticate with a password to enter redis and also we can choose databases among 16.

Processing the batch ‘X’ at a time

We need to pass the job to a encoding function.For this

queue.process("encoding", vars.QUEUE_COUNT, async (job: any, done: any) => {
    try {
        await encodingAndUploading(job, done);
    } catch (error) {
        job.failed().error(error);
    }

queue.process() currently returns a singleton Queue instance.The processing is done here.we pass the callback done to encoding and also the job that is created earlier.And we call the await function encogingAndUploading() for conversion. When there is error occurs we invokes done(err) otherwise done().The done(err) and done() are called from the function encogingAndUploading(),you can see in code.In this you see a QUEUE_COUNT ,it shows the maximum active jobs in the queue.others will go to inactive state,when the active queue frees it get automatically put to active queue.

Encoding…

export const encodingAndUploading = async (job: any, done: CallableFunction) => {
    try {
        const { media } = job.data;
        const { file, slug, channelName, username } = media;
        const fileMD5 = file.md5;
        let duration: any = 0;
        const dir = `streams/${fileMD5}`; //dir to save

        //  Encode to various qualities
        console.log(`Start ${fileMD5} Encoding...`);
        let playlistContent: string = `#EXTM3U\n#EXT-X-VERSION:3\n`;
        const multipleResolution = [{
            output: "240p.m3u8",
            videoBitrate: "145",
            audioBitrate: "64",
            size: "426x240",
        },
        {
            output: "720p.m3u8",
            videoBitrate: "2400",
            audioBitrate: "128",
            size: "1280x720",
        },
        {
            output: "1080p.m3u8",
            videoBitrate: "5000",
            audioBitrate: "192",
            size: "1920x1080",
        }];
const addOption = [
 "-profile:v baseline",   // baseline profile (level 
                             3.0) for H264 video codec
 "-level 3.0",
 "-start_number 0",       // start the first .ts segment 
                             at index 0
 "-hls_time 4",           // 4 second segment duration
 "-hls_key_info_file enc.keyinfo",
 "-hls_playlist_type vod",
 "-hls_flags periodic_rekey",
 "-hls_list_size 0",       // Maximum number of playlist 
                            entries (0 means all 
                             entries/infinite)
 "-f hls",                 // HLS format
 "-threads 0",
        ];  

        if (!fs.existsSync(dir)) { fs.mkdirSync(dir); }

        const command = FFmpeg({ source: file.tempFilePath })
            .withVideoCodec("libx264")
            .withAudioCodec("aac");
multipleResolution.forEach((data: { output: string, videoBitrate: string, audioBitrate: string, size: string }) => {
        command
         .output(`${dir}/${data.output}`, { end:false })
                .addOption(addOption)
                .withVideoBitrate(data.videoBitrate)
                .withAudioBitrate(data.audioBitrate)
                .withAudioChannels(2)
                .withSize(data.size);
            playlistContent = playlistContent + `#EXT-X-STREAM-INF:BANDWIDTH=${parseInt(data.videoBitrate, 10) * 1024},RESOLUTION=${data.size},CODECS="avc1.42c01e,mp4a.40.2"\n${data.output}\n`;
        });
        command
            .on("codecData", (data: any) => {
                job.progress(0, 100, "ENCODING");
                console.log("Input is " + data.audio + " audio with " + data.video + " video");
            })
            .on("progress", (progress: any) => {
                job.progress(progress.percent, 100, "ENCODING");
                console.log("Encoding Processing: " + progress.percent + "% done");
            })
            .on("error", (err, stdout, stderr) => {
                console.log("Cannot process video: " + err.message, stdout, stderr);
                job.failed().error(err);
            })
            .on("end", async () => {
                console.log("Encoding Process finished successfully");
fs.writeFileSync(`${dir}/playlist.m3u8`,playlistContent, "utf8");

     // Upload the files
await fileUpload(media, job.data.jwtToken, duration, job); // file upload into the DigitalOcean Spaces

// At this point we no longer need the source file, let's delete it.
// Hoping the upload to Wasabi would be done by now. If not we will need to handle this
                fs.unlinkSync(file.tempFilePath);
                done();
            }).run();
    } catch (error) {
        console.log("Error: ", error);
        done(error);
    }
};

In the fuction encodingAndUploading() we passed the job that contains the file to encode and done the callback-function that i mentioned earlier.

const fileMD5 = file.md5;

you know MD5 is message digest algorithm and meant for security, here i am using this as identification.MD5 files function similarly to human fingerprints.It can be used to verify identity.We have to encode 100s of file so each one should provide an unique identity, that’s it.

const multipleResolution = [{ output: "240p.m3u8", videoBitrate: "145", audioBitrate: "64", size: "426x240", }, { output: "360p.m3u8", videoBitrate: "365", audioBitrate: "96", size: "640x360", },.......]

its an array of formats that i wanted,ie 240p,360p,1080p and we encode each item using ffmpeg.Then you can see a array addoptions=[…] ,these are the extra options for encoding purpose to specify your needs.you will get exact info in ffmpeg doc.I am not explaining it now.

if (!fs.existsSync(dir)) { fs.mkdirSync(dir); }

Here is a directory creation for saving the encoded files, later I delete it that I will show you.

const command = FFmpeg({ source: file.tempFilePath })
.withVideoCodec("libx264")
.withAudioCodec("aac");

Here we start, the source is the file the input file.tempFilePath gives the location of file.Then specify the videoCodec for your file. I am using lib*264 because it have the features like including 8×8 and 4×4 adaptive spatial transform, adaptive B-frame placement.And my audio codec is aac, you can check the properties of video after encoding,the audio format will be aac.

multipleResolution.forEach((data: { output: string, videoBitrate: string, audioBitrate: string, size: string }) => {
command
.output(${dir}/${data.output}, { end: false })
.addOption(addOption)
.withVideoBitrate(data.videoBitrate)
.withAudioBitrate(data.audioBitrate)
.withAudioChannels(2)
.withSize(data.size);
playlistContent = playlistContent + #EXT-X-STREAM-INF:BANDWIDTH=${parseInt(data.videoBitrate, 10) * 1024},RESOLUTION=${data.size},CODECS="avc1.42c01e,mp4a.40.2"\n${data.output}\n;
});

Then there is a loop of multipleResolution,the various formats and say how to encode it.The output specifies the folder for saving file, .addoption(addoption) for adding option that i mentioned earlier, with some videobitrate and audiobitrates.and with audiochannel(2),and size.All of the data are saved to a object array that i mentioned earlier, multipleResolution.forEach((data: { output: string, videoBitrate: string, audioBitrate: string, size: string }).

playlistContent = playlistContent + `#EXT-X-STREAM-INF:BANDWIDTH=${parseInt(data.videoBitrate, 10) * 1024},RESOLUTION=${data.size},CODECS=”avc1.42c01e,mp4a.40.2″\n${data.output}\n`;

This is master playlist creation,you can refer here for further info.

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:BANDWIDTH=2339363,RESOLUTION=640x360,CODECS="avc1.64001e,mp4a.40.2"
v0/prog_index.m3u8

#EXT-X-STREAM-INF:BANDWIDTH=540863,RESOLUTION=960x540,CODECS="avc1.64001f,mp4a.40.2"
v1/prog_index.m3u8

And this, the original code for creating playlist.

.on("progress", (progress: any) => {
job.progress(progress.percent, 100, "ENCODING");
console.log("Encoding Processing: " + progress.percent + "% done");
})

Here we can account the rate of conversion,you can emit the percentage to your app as you wish.

.on("end", async () => {
                console.log("Encoding Process finished successfully");

fs.writeFileSync(`${dir}/playlist.m3u8`,playlistContent, 
                                    "utf8");
 // Upload the files
await fileUpload(media,job.data.jwtToken, duration,job); // file upload into the DigitalOcean Spaces

   
// At this point we no longer need the source file, let's delete it.
// Hoping the upload to Wasabi would be done by now. If not we will need to handle this
   fs.unlinkSync(file.tempFilePath);
   done();
}).run();

Then we have write the playlist content to directory

fs.writeFileSync(${dir}/playlist.m3u8,playlistContent,
"utf8");

utf8 is a 8-bit Unicode Transformation Format : a variable width character encoding. After encoding we have to upload to any storage here i use digital ocean space and there is function to do it fileupload().After this we have delete the source file because it is no longer needed. “fs.unlinkSync(file.tempFilePath);

after encoding the output

After running the code you will get output like this.Thats it.

Emitting the progress via web sockets

Here we can emit the progress of encoding to the view ,In the kue creation part it is defined,

.on("progress", (progress, state) => {
console.log("\r job #" + job.id + " " + progress + "% complete with data ", state); global["io"].emit(${media.username}-${media.channelName}-${media.slug}, { progress, state });
});

In your app.ts we have some changes

const http = require("http").Server(app);
const io = require("socket.io")(http);
global["io"] = io;

we are using socket.io for emitting corresponding changes to view,you guys can take a look at socket:docs.Then in kue.js we use emit() function here we emit the progress ie,percentage and also the states,I have the states like queing,encoding,completed.you change the states according to it.

Error handling

The handling is already done, yeah the done() do you remember , in kue,say queue processing area i have mentioned about this.If we get an error the done(error)

catch (error) {
console.log("Error: ", error);
done(error);
}

is called because it is declared as a callback function, else it pass simply done() shows no error.

}).on("error", (err) => {
console.log("Oops… ", err);
job.remove(function (err) {
console.log('removed job #%d', job.id);
});
job.failed().error(err);

here the done(error) comes then we can remove the job or we can put it into failed state.

Ok friends its going to an end, For any help or suggestion comment below, Thank you.

3 responses to “Video encoding using FFmpeg & node.js with an active queue”

  1. Nahaz Avatar

    Hi, I just read texts on your website and I became interested in the topic. I like your article and I am thinking whether I could use your words in my work? Would it be probable? If yes, please contact with me. Thank you.

  2. Alvin Entrada Avatar
    Alvin Entrada

    Someone I work with visits your blog regularly and recommended it to me to see too. The way of writing is great and also the content is top-notch. Thanks for that insight you supply the readers!

  3. Dayron Avatar

    Forgive my ignorance, I see that you are cloning the ffmpeg project from git, so how yo use ffmpeg actually? direct from the source code. will this work on windows too?

Leave a Reply

Your email address will not be published. Required fields are marked *