Niko's Project Corner

Scalable analytics with Docker, Spark and Python

Description Architecture with horizontal scaling for offline and online
Languages Bash
Tags Ar­chi­tec­ture
Duration Fall 2015
Modified 23rd December 2015
GitHub nikonyrh/docker-scripts

Tra­di­tion­ally data sci­en­tists in­stalled soft­ware pack­ages di­rectly to their ma­chi­nes, wrote code, trained mod­els, saved re­sults to lo­cal files and ap­plied mod­els to new data in batch pro­cess­ing style. New data-driven prod­ucts re­quire rapid de­vel­op­ment of new mod­els, scal­able train­ing and easy in­te­gra­tion to other as­pects of the busi­ness. Here I am propos­ing one (per­haps al­ready well-known) cloud-ready ar­chi­tec­ture to meet these re­quire­ments.

In this ex­am­ple an­alyt­ical mod­els are writ­ten in Python but this pat­tern could be adapted to other sce­nar­ios as well. It is rooted on the idea of hav­ing a base docker im­age with needed Python li­braries in­stalled and then "branch­ing" two spe­cial-pur­pose con­tain­ers from that, as seen in Fig­ure 1. The first one is used for of­fline model train­ing and other is for on­line model ap­ply­ing. The main ben­efit is easy and ro­bust de­ploy­ment of the plat­form to new com­puter in­stances in cloud or in premises. Ad­di­tion­ally it only re­quires docker sup­port from the host OS and no for ex­am­ple Java or Python needs to be in­stalled there at all.

Figure 1: Three docker im­ages' re­la­tions and their in­stalled con­tents.

Ev­ery-day de­vel­op­ment could done within a con­tainer as well but for fast pro­to­typ­ing and IDE in­te­gra­tion (such as Spy­der) it might be sim­pler to in­stall Python di­rectly to the de­vel­op­ment ma­chine. Vir­tualenv can be used to get de­cent level of iso­la­tion be­tween pro­jects.

Model train­ing might be im­ple­mented as a high-level func­tion which ac­cepts pa­ram­eters which re­strict the scope to a speci­fic times­pan, prod­ucts, stores, users or other en­ti­ties. Typ­ically dif­fer­ent "scopes" can be triv­ially trained in par­al­lel, but if it takes tens of min­utes to train each chunk then it might be worth­while to train scopes in se­quence and uti­lize par­al­lelism within the scope. Within a sin­gle ma­chine li­braries like mul­ti­pro­cess­ing could be used, but to scale hor­izon­tally Apache Spark is a per­fect fit. A great ben­efit is that it sup­ports Scala, Java, Python and R pro­gram­ming lan­guages. It is based on mas­ter-slave model, in which job def­ini­tion (pa­ram­eters + source code) is sub­mit­ted to the mas­ter and it co­or­di­nates sub-jobs ex­ecu­tion on slaves. Its ar­chi­tec­ture is il­lus­trated at Fig­ure 2. It comes with ma­chine learn­ing li­brary MLib or you can write your own.

Figure 2: Three main com­po­nents of Spark: the driver (the client), clus­ter mas­ter and slave nodes.

Nat­urally Spark could be in­stalled di­rectly on the op­er­at­ing sys­tem, but if Python is used then also needed li­braries such as NumPy needs to be in­stalled as well. Thus cre­at­ing new slaves and adding them to the clus­ter is greatly sim­pli­fied by cre­at­ing a Docker im­age. It is de­rived from a base im­age which has rel­evant Python li­braries in­stalled and adds Java + Spark bi­na­ries and start-up scripts.

The re­sult­ing im­age is about 1 GB in size but com­presses down to 500 MB for trans­porta­tion. To add a new ma­chine to the clus­ter the im­age is sim­ply loaded to the ma­chine and start-up script is called with rel­evant pa­ram­eters such as mas­ter node's IP and the num­ber of work­ers / CPU core. Also at­ten­tion needs to be paid on SPARK_PUB­LIC_DNS set­ting, as a dif­fer­ent pub­lic ip needs to be ad­ver­tised to the mas­ter than the con­tainer's in­ter­nal ip. De­tails were dis­cussed at Stack­over­flow.

Once mod­els have been trained they are ready to be made avail­able for oth­ers to ben­efit from, be it other teams within the or­ga­ni­za­tion or mak­ing it avail­able to the pub­lic. At the age of ser­vice ori­ented ar­chi­tec­ture and mi­cro ser­vices a HTTP based in­ter­face is a nat­ural choice. Nat­urally scal­abil­ity and fault tol­er­ance are im­por­tant as­pects as well. There are many ways to achieve this but it is easy to get started with Ng­inx front-end and Gu­ni­corn back-ends, a seen on Fig­ure 3. This also en­ables zero-down­time soft­ware up­grades by start­ing a new back-end, con­firm­ing it works, adding it to the ng­inx con­fig and re­mov­ing + shut­ting down the old ones. At larger scale all this should be au­to­mat­ically man­aged but I don't have per­sonal ex­pe­ri­ence of those yet. On this ar­chi­tec­ture pro­ject's source code is "baked in" to the con­tainer and mod­els are loaded from for ex­am­ple an ex­ter­nal database. Al­ter­na­tively they could be se­ri­al­ized and com­pressed into files within the con­tainer as well.

It is pos­si­ble to have clients ran­domly con­nect ei­ther one of the in­stances, or have a "pri­mary" Ng­inx which clients use by de­fault. If the lat­ter op­tion is cho­sen then other in­stances' back-ends needs to be added to pri­mary Ng­inx's con­fig as well, as in­di­cated by gray­ish ar­rows on the fig­ure be­low.

Figure 3: Two ex­am­ple in­stances with Ng­inx run­ning on them and two Gu­ni­corn back­ends on each.

This ar­ti­cle didn't cover scal­able databases such as Elas­tic­search, Cas­san­dra and Ama­zon Red­shift. It is an interesting topic on its own, and a perfect fit for data processing with Spark. It also supports file-based data sources such as HDFS and Ama­zon S3.

An other topic is au­to­matic clus­ter man­age­ment sys­tems such as Docker Swarm, Apache Mesos and Ama­zon EC2 Con­tainer Ser­vice. Highest cost savings from cloud come from adaptively utilizing resources based on expected load. When new containers are started and others are shut down service discovery becomes a crucial "glue" to stitch everything together in a fault-tolerant manner. This can be solved via traditional DNS or utilize newer solutions like distributed Reg­is­tra­tor and Con­sul.

Related blog posts: