Thursday, February 27, 2014

Scalatra with actors command Spark, at last I figured it out

In this post I echoed my frustration about being stuck in figuring out how to share a Spark context among akka actors in a Scalatra environment. Well, I was wrong to say it is difficult or impossible, it turned out to be pretty easy. Sigh. At least I figured it out myself ;)

In any case, I started with a basic giter8 project as per Scalatra website.

g8 scalatra/scalatra-sbt 

Answer all the questions and move on to the directory that was created. Mine is called "pipelineserver" and my package will be com.github.ognenpv.pipeline

giter8 would have created a few important files:

project/build.scala
src/main/scala/com/github/ognenpv/pipeline/PipelineServlet.scala
src/main/scala/ScalatraBootstrap.scala

Let's go through them one by one:

project/build.scala

Add or change the following to the contents of it, in appropriate places, leave the rest be:

  val ScalaVersion = "2.10.3"
  val ScalatraVersion = "2.2.2"

        "org.apache.spark" % "spark-core_2.10" % "0.9.0-incubating",
        "org.apache.hadoop" % "hadoop-client" % "2.2.0"

I use hadoop 2.2.0 since my Spark standalone cluster was compiled against that version of Hadoop libraries and I run an HDFS filesystem based on that, on the same cluster.

src/main/scala/com/github/ognenpv/pipeline/PipelineServlet.scala

package com.github.ognenpv.pipeline

import akka.actor.{ActorRef, Actor, ActorSystem}
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import org.scalatra._
import scalate.ScalateSupport
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.slf4j.{Logger, LoggerFactory}
import scala.concurrent.ExecutionContext
import org.scalatra.{Accepted, AsyncResult, FutureSupport, ScalatraServlet}

class PipelineServlet(system:ActorSystem, myActor:ActorRef) extends PipelineserverStack with FutureSupport {

  import _root_.akka.pattern.ask
  implicit val timeout = Timeout(36000)
  protected implicit def executor: ExecutionContext = system.dispatcher

  get("/") {
  myActor ? "first20"
  }

  get("/count") {
        myActor ? "count"
  }

  get ("/test") {
    myActor ? "test"
  }
}

class MyActor(sc:SparkContext) extends Actor {
  // change the following two lines to do whatever you want to do
  // with whatever filesystem setup and format you have
  val f = sc.textFile("hdfs://10.10.0.198:54310/test/2013-12-01.json")
  val events = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == "Sign Up").map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache

  def receive = {
    case "count" => sender ! events.count.toString
    case "first20" => sender ! events.take(20).toList.toString
    case "test" => sender ! "test back!"
    case "_" => sender ! "no habla!"
  }
}

The system is very simple - we create an actor, one that will receive the SparkContext and read in a basic file from HDFS, do some basic parsing via some Spark actions and cache the result of the parsing. The result is an Array of tuples, each  looking like this (id:Long, tag:Int), where tag is a 0 or a 1.

This actor will be responsible for doing things to the cached result, when asked by the Scalatra servlet, as a result of the route being served. It will execute the spark action and return the result to the Scalatra servlet. We are not doing any error checking, for simplicity. We are also not trying to be smart about timeouts and synchronization, for simplicity (herein somewhere lie Futures? ;)

Finally, this is the contents of the Scalatra bootstrap class:

src/main/scala/ScalatraBootstrap.scala

import akka.actor.{ActorSystem, Props}
import com.github.ognenpv.pipeline._
import org.scalatra._
import javax.servlet.ServletContext
import org.apache.spark._
import org.apache.spark.SparkContext._

class ScalatraBootstrap extends LifeCycle {

  val sc = new SparkContext("spark://10.10.0.200:7077","PipelineServer","/home/sparkuser/spark")
  // adjust this to your own .jar name and path
  sc.addJar("target/scala-2.10/pipelineserver_2.10-0.1.0-SNAPSHOT.jar")
  val env = SparkEnv.get
  val system = env.actorSystem
  val myActor = system.actorOf(Props(new MyActor(sc)))

  override def init(context: ServletContext) {
    context.mount(new PipelineServlet(system, myActor), "/actors/*")
  }

  override def destroy(context:ServletContext) {
    sc.stop
    system.shutdown()
  }
}

In this class we create the actual SparkContext. Beware that when you run sbt, you should execute the packageBin task first to create the jar that you will feed to your SparkContext. If you do not do this, Spark actions like count (which are "jobs" executed via passing closures around Spark's actor system, it would seem, this is my uneducated guess) will fail because Spark will not be able to find the class necessary to pass around the closure being executed.

Notice also that we are using Spark's ActorSystem obtained via the SparkEnv which is given to every thread (SparkEnv.get). We then use this ActorSystem to create our actor responsible to executing SparkContext actions. Also notice that on destroy() we are asking Scalatra to "stop" the current SparkContext so that we can exit the Spark portion of the servlet cleanly.

Change the necessary variables when creating the SparkContext and you should be good to run the code on your Spark cluster (I am running a standalone cluster in a VPC on Amazon).

> sbt
> packageBin
> container:start

Enjoy :)
Ognen

No comments: