Wednesday, February 26, 2014

Scala actors and Spark Contexts

One nice thing about Scala is the akka Actor system - idea borrowed from Erlang where it has found great success with its "Let it crash" philosophy.

One of the goals of my work at QuizUp is to create a flexible data analytics pipeline. A nice add-on is exposing this pipeline through a slick web front-end. For this I have chosen Unfiltered, in combination with actors and futures to "command" the Spark pipeline backend.

While I will not go into details of the whole deal in this post, I will post a basic Actor example to run a simple Spark action on a Hadoop based file. It's funny how one simple sentence can translate into heaps of technology and mounds of work ;)

Anyways, below it is.
Ognen

import akka.actor.{Actor, Props, ActorSystem, ActorRef}
import akka.event.slf4j.Slf4jLogger
//import scala.concurrent.Future
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkEnv 

// this will be our message
case class HdfsFile(filename:String,ctx:SparkContext)

//case class setenviron(se:SparkEnv)

class HelloActor extends Actor {
    def receive = {
        //case setenviron(se) => SparkEnv.set(se)
        case HdfsFile(fn,ctx) => {
            val f = ctx.textFile(fn)
            // send the number of lines in the file back
            sender ! f.count
        }
        case "buenos dias" => println("HelloActor: Buenas Noches!")
        case _ => "I don't know what you want!"
    }
}

class SenderActor(to:ActorRef) extends Actor {
    def receive = {
        case i:Long => {
            println(s"Sender: Number of lines: ${i}")
        }
        // bounce the message
        case hf:HdfsFile=> { to ! hf }
        case _ => println("Sender: Nevermind!")
    }
}

object Main {
    def main(args: Array[String]) {
        // use the fair scheduler just for fun - no purpose in this example      
        System.setProperty("spark.scheduler.mode", "FAIR")
        // change the settings in the following line to match your configuration
        // mine is a standalone cluster within a VPC on Amazon
        // you can also substitute spark://; with just local[n] where n>1 to run locally
        val conf = new SparkConf().setMaster("spark://ip_address:7077").setAppName("Hello").setSparkHome("/home/sparkuser/spark")
        //val conf = new SparkConf().setMaster("local[2]").setAppName("Hello").setSparkHome("/Users/maketo/plainvanilla/spark-0.9")

        val sc = new SparkContext(conf)
        sc.addJar("target/scala-2.10/helloactor_2.10-1.0.jar")
        // create and start the actor
        val env = SparkEnv.get
        // Use Spark's own actor system
        val system = env.actorSystem
        
        // create the actor that will execute the Spark context action
        val helloActor1 = system.actorOf( Props[ HelloActor], name = "helloactor1")
        // pass it to the second actor that just acts as a bouncer/receiver
       val senderActor1 = system.actorOf( Props(new SenderActor(helloActor1)), name = "senderactor1")

        senderActor1 ! new HdfsFile("hdfs://path:port/2013-12-01.json",sc)
        helloActor1 ! "buenos días"

        // for operations that take time, we have to wait
        // hence, we will wait forever with the next statement
        system.awaitTermination
        // there are better ways to deal with concurrency and timeouts
        // Futures are one of those ways
    }
}

No comments: