Thursday, February 27, 2014

Easy Spark development on Amazon

I have a VPC set up on Amazon for our data pipeline - from the get-go, it was a prerogative of mine to make everything as secure as possible. As part of this setup, I have created an OpenVPN gateway to access the "inside" of the pipeline. This whole setup takes some time and is a bit intricate - I will document it in a different post.

The goal of this post is to share my "workflow" for developing and testing Spark apps. Inside the VPC is a set of 16 nodes which are the standalone Spark cluster. The same 16 nodes are also a part of a Hadoop HDFS cluster where each node's ephemeral disk space (1.6TB on each machine) has been set up as a RAID0 array (Amazon gives you 4x400GB partitions) and these RAID0 arrays are a part of the HDFS pool. The difference in speed is very noticeable compared to EBS attached volumes, but I digress - as I said, this will be a topic for a different article.

I work at home on my Macbook Air but my cluster is on Amazon. Since I use OpenVPN, I purchased a copy of Viscosity to be able to connect to the VPC. There is also Tunnelblick which is free but I have found it to be "flaky" and a bit unstable (personal opinion/experience - YMMV) compared to Viscosity which has been solid and at $9/year the price cannot be beat.

So, workflow:

1/ Fire up Viscosity, establish VPN connection to VPC

2/ Mount a directory on the Spark cluster where I will be running my application code - I use sshfs:
shfs -o sshfs_sync -o sync_readdir sparkuser@spark-master:/home/sparkuser/spark_code spark-master/

To do this you will need to set up your ssh via keys.

3/ Now that my remote folder is mounted locally in spark_master/, I use SublimeText to edit my files. Each save is immediately propagated to the remote machine. This may or may not be slow, depending on your setup/connectivity.

4/ ssh sparkuser@spark-master

5/ cd spark_code/whatever_directory_my_current_project_is_in

6/ Run sbt

7/You can run sbt so it is sensitive to files changing in the project - it will trigger an automatic recompile each time a file changes. The simples way is to do
> ~ compile

In any case, hope this helps :)

Scalatra with actors command Spark, at last I figured it out

In this post I echoed my frustration about being stuck in figuring out how to share a Spark context among akka actors in a Scalatra environment. Well, I was wrong to say it is difficult or impossible, it turned out to be pretty easy. Sigh. At least I figured it out myself ;)

In any case, I started with a basic giter8 project as per Scalatra website.

g8 scalatra/scalatra-sbt 

Answer all the questions and move on to the directory that was created. Mine is called "pipelineserver" and my package will be com.github.ognenpv.pipeline

giter8 would have created a few important files:


Let's go through them one by one:


Add or change the following to the contents of it, in appropriate places, leave the rest be:

  val ScalaVersion = "2.10.3"
  val ScalatraVersion = "2.2.2"

        "org.apache.spark" % "spark-core_2.10" % "0.9.0-incubating",
        "org.apache.hadoop" % "hadoop-client" % "2.2.0"

I use hadoop 2.2.0 since my Spark standalone cluster was compiled against that version of Hadoop libraries and I run an HDFS filesystem based on that, on the same cluster.


package com.github.ognenpv.pipeline

import{ActorRef, Actor, ActorSystem}
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import org.scalatra._
import scalate.ScalateSupport
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.slf4j.{Logger, LoggerFactory}
import scala.concurrent.ExecutionContext
import org.scalatra.{Accepted, AsyncResult, FutureSupport, ScalatraServlet}

class PipelineServlet(system:ActorSystem, myActor:ActorRef) extends PipelineserverStack with FutureSupport {

  import _root_.akka.pattern.ask
  implicit val timeout = Timeout(36000)
  protected implicit def executor: ExecutionContext = system.dispatcher

  get("/") {
  myActor ? "first20"

  get("/count") {
        myActor ? "count"

  get ("/test") {
    myActor ? "test"

class MyActor(sc:SparkContext) extends Actor {
  // change the following two lines to do whatever you want to do
  // with whatever filesystem setup and format you have
  val f = sc.textFile("hdfs://")
  val events = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == "Sign Up").map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache

  def receive = {
    case "count" => sender ! events.count.toString
    case "first20" => sender ! events.take(20).toList.toString
    case "test" => sender ! "test back!"
    case "_" => sender ! "no habla!"

The system is very simple - we create an actor, one that will receive the SparkContext and read in a basic file from HDFS, do some basic parsing via some Spark actions and cache the result of the parsing. The result is an Array of tuples, each  looking like this (id:Long, tag:Int), where tag is a 0 or a 1.

This actor will be responsible for doing things to the cached result, when asked by the Scalatra servlet, as a result of the route being served. It will execute the spark action and return the result to the Scalatra servlet. We are not doing any error checking, for simplicity. We are also not trying to be smart about timeouts and synchronization, for simplicity (herein somewhere lie Futures? ;)

Finally, this is the contents of the Scalatra bootstrap class:


import{ActorSystem, Props}
import com.github.ognenpv.pipeline._
import org.scalatra._
import javax.servlet.ServletContext
import org.apache.spark._
import org.apache.spark.SparkContext._

class ScalatraBootstrap extends LifeCycle {

  val sc = new SparkContext("spark://","PipelineServer","/home/sparkuser/spark")
  // adjust this to your own .jar name and path
  val env = SparkEnv.get
  val system = env.actorSystem
  val myActor = system.actorOf(Props(new MyActor(sc)))

  override def init(context: ServletContext) {
    context.mount(new PipelineServlet(system, myActor), "/actors/*")

  override def destroy(context:ServletContext) {

In this class we create the actual SparkContext. Beware that when you run sbt, you should execute the packageBin task first to create the jar that you will feed to your SparkContext. If you do not do this, Spark actions like count (which are "jobs" executed via passing closures around Spark's actor system, it would seem, this is my uneducated guess) will fail because Spark will not be able to find the class necessary to pass around the closure being executed.

Notice also that we are using Spark's ActorSystem obtained via the SparkEnv which is given to every thread (SparkEnv.get). We then use this ActorSystem to create our actor responsible to executing SparkContext actions. Also notice that on destroy() we are asking Scalatra to "stop" the current SparkContext so that we can exit the Spark portion of the servlet cleanly.

Change the necessary variables when creating the SparkContext and you should be good to run the code on your Spark cluster (I am running a standalone cluster in a VPC on Amazon).

> sbt
> packageBin
> container:start

Enjoy :)

Wednesday, February 26, 2014

Spark's "weird" context sharing rules - how I killed a week trying!

"I spent about a week on this."

That's my opening statement intended to fully depict my level of frustration. When you are facing deadlines (as many of us do in the real world), it is not easy to spend a week or two, just like that, with not much in terms of results. But, I digress, politely.

So, what's the problem? Well, imagine you spent some time and wrote a bunch of "queries" to crunch your "big data" in Spark. It holds a great promise - it is a much faster reimplementation of Hadoop, in memory. It can read, no, devour files in HDFS in parallel. It can do streaming, almost-in-real-time queries. It can do 0MQ, it can do parallel access to files on Amazon's S3, it can talk Cassandra, Mongo. It is written in Scala, it can do Akka actors talking to each other, it features Resilient Distributed Datasets that are reproducible (hence the resilient part).

Only problem is, the thing is undocumented. Well, almost documented. Well, that depends on your point of view and how much time you have to hack on it. The documentation is not bad, it is just lacking in the very fine points, the ones you inevitably hit when you start "abusing" the platform ;)

OK, back to my "situation". Let's say you spent some time, you learned Scala, you learned something about what/how Map/Reduce is, now you wrote a bunch of Scala programs to execute Spark queries. Each query is a jar file, that's the nature of the beast, you run these in batches, via a cron job or by hand when needed. Nothing wrong with that. Except....

Except, one day, your head data analyst comes to you and says that it would be nice to have all this stuff exposed via the web in a slick, "exploratory" kind of way. You say, sure, great! I get to learn Scalatra, Bootstrap and more!

You are all gung-ho, walking on clouds. Except....

Except that it turns out you cannot allocate multiple Spark Contexts (main way to drive Spark calculations and actions) from the same app. That's a bit of a problem (and that's a bit of an understatement ;) since a web server by default serves requests in parallel from multiple threads. You cannot pool SparkContext object, cannot optimize, heck, all your dreams have crashed and burned and you risk looking like an idiot in front of your whole team!

(Digression: Scalatra is absolutely awesome, by the way! It is a very clean, nice way to write RESTful APIs, you can serve HTML, Javascript, deluxe. But, it turns out there is a kink here too - I ran into major problems making a Scalatra servlet work with Spark - for some reason allocating a SparkContext from a Scalatra servlet was a no-go).

This drove me to Unfiltered (after I spent 3-4 days and $40 on a Scalatra book and fell in love with the framework, damn it!). Unfiltered is simple, functional, beautiful in a Haskellish kind of way, does not depend on any version of Akka so you can use whichever one matches the version Spark comes with.

OK. Back to sharing SparkContext instances. It turns out the friendly experts at Ooyala thought about this problem a few months ago and wrote a patch to the latest version of Spark (0.9). This jobserver github branch is meant to expose a RESTful API to submit jars to your Spark cluster. Sounds great (!) and it is, it actually works. Except....

Except that it comes with Hadoop 1.0.4 by default. My luck has it that my Hadoop HDFS cluster runs 2.2.0. You can compile Ooyala's jobserver branch of Spark-0.9-incubating with support for Hadoop 2.2.0 the standard way. It assembles. Except that it does not work. You cannot start the standalone cluster, it complains with the following exceptions:

14/02/25 20:15:24 ERROR ActorSystemImpl: RemoteClientError@akka://sparkMaster@ Error[java.lang.UnsupportedOperationException:This is supposed to be overridden by subclasses.
at akka.remote.RemoteProtocol$AddressProtocol.getSerializedSize(
[snipped for brevity]

So, what to do? A week or more later into this nightmare, I ran into a few online discussions claiming that since 0.8.1 (Dec 2013) Spark supports SparkContexts that are "thread safe". To be more precise, this is exactly the promise:

"Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

In theory, it should work. Stay tuned for the conclusions from the practical experience.

Scala actors and Spark Contexts

One nice thing about Scala is the akka Actor system - idea borrowed from Erlang where it has found great success with its "Let it crash" philosophy.

One of the goals of my work at QuizUp is to create a flexible data analytics pipeline. A nice add-on is exposing this pipeline through a slick web front-end. For this I have chosen Unfiltered, in combination with actors and futures to "command" the Spark pipeline backend.

While I will not go into details of the whole deal in this post, I will post a basic Actor example to run a simple Spark action on a Hadoop based file. It's funny how one simple sentence can translate into heaps of technology and mounds of work ;)

Anyways, below it is.

import{Actor, Props, ActorSystem, ActorRef}
import akka.event.slf4j.Slf4jLogger
//import scala.concurrent.Future
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkEnv 

// this will be our message
case class HdfsFile(filename:String,ctx:SparkContext)

//case class setenviron(se:SparkEnv)

class HelloActor extends Actor {
    def receive = {
        //case setenviron(se) => SparkEnv.set(se)
        case HdfsFile(fn,ctx) => {
            val f = ctx.textFile(fn)
            // send the number of lines in the file back
            sender ! f.count
        case "buenos dias" => println("HelloActor: Buenas Noches!")
        case _ => "I don't know what you want!"

class SenderActor(to:ActorRef) extends Actor {
    def receive = {
        case i:Long => {
            println(s"Sender: Number of lines: ${i}")
        // bounce the message
        case hf:HdfsFile=> { to ! hf }
        case _ => println("Sender: Nevermind!")

object Main {
    def main(args: Array[String]) {
        // use the fair scheduler just for fun - no purpose in this example      
        System.setProperty("spark.scheduler.mode", "FAIR")
        // change the settings in the following line to match your configuration
        // mine is a standalone cluster within a VPC on Amazon
        // you can also substitute spark://; with just local[n] where n>1 to run locally
        val conf = new SparkConf().setMaster("spark://ip_address:7077").setAppName("Hello").setSparkHome("/home/sparkuser/spark")
        //val conf = new SparkConf().setMaster("local[2]").setAppName("Hello").setSparkHome("/Users/maketo/plainvanilla/spark-0.9")

        val sc = new SparkContext(conf)
        // create and start the actor
        val env = SparkEnv.get
        // Use Spark's own actor system
        val system = env.actorSystem
        // create the actor that will execute the Spark context action
        val helloActor1 = system.actorOf( Props[ HelloActor], name = "helloactor1")
        // pass it to the second actor that just acts as a bouncer/receiver
       val senderActor1 = system.actorOf( Props(new SenderActor(helloActor1)), name = "senderactor1")

        senderActor1 ! new HdfsFile("hdfs://path:port/2013-12-01.json",sc)
        helloActor1 ! "buenos días"

        // for operations that take time, we have to wait
        // hence, we will wait forever with the next statement
        // there are better ways to deal with concurrency and timeouts
        // Futures are one of those ways

Wednesday, February 19, 2014

A (n ever growing) list of things that annoy me about the Mac

As a perk (or necessity) of  a new job, I had my employer buy me a new Macbook Air and an iPhone 5S. Here is a list of annoying things about the Apple "approach" so far.

1/ Waking up from sleep is a b*tch! If your Macbook (running Mavericks) falls asleep or you put it to sleep, waking it up on your wireless network may take a while. Sure, it shows it is connected but you can't get anywhere.

2/ Related to 1/ above - you can take a wireless hot-spot down but it may or may not still show up in the list of available wireless hotspots for a while, even though your Mac claims it has refreshed the list.

3/ Macbook Air claims it has AirDrop, iPhone 5S claims it has AirDrop but they cannot communicate with each other. Only iPhone to iPhone or Mac to Mac is allowed, even though they both run the same app. WTF?

4/ Macbook supports Bluetooth and iPhone supports bluetooth but you cannot pair the laptop to the phone. Microsoft much, Apple?

5/ The above two points mean that in order for you to move a few photos from your phone to your laptop (which may be sitting right next to your phone) you have the following choices: a) involved a cord (yuck! so 20th century), b) use iTunes over WiFi (why would I want to involve a massive app to move two photos) or c) use the Cloud (a round trip to the Internet and back to move a photo between a phone and a laptop that are literally two inches away?). I thought Apple was all about simplicity! Even on my old Android phone and Linux laptop I could do a Bluetooth transfer and solve the problem easily.

6/ If you have a Macbook attached to an Apple display via Thunderbolt, there is no predicting what will happen on hook-up and un-hooking of the two devices. One time my laptop just would not wake up anymore.

7/ In a situation where your Mac was attached to an Apple display via Thunderbolt and your laptop's lid is down (hence you are watching everything on the big display), you can plug in your headset into the laptop (since there are no jacks on the monitor) and it will happily ignore the headset. Either provide me with a jack on the monitor or do not ignore the jack on the headset.

I am sure there are more but I have only been playing with this for a few weeks now. Don't get me wrong, I like what Apple brings to the table, for the most part. What I do not like is the superiority approach that many Apple fanboys bring to the table, while happily forking thousands of dollars for an expensive piece of machinery that has its silly problems. In fact, one of my Apple fanboy friends told me that I am "too old school" to be using a Mac. Oh well.

Monday, February 10, 2014

The law of large numbers (but not what you think)

There is an interesting thing happening these days. I suspect it has been happening for a while now, actually.

With the consolidation of the markets within only a few players, quality of services has slowly eroded.

It used to be that as a customer I was king and that customer service meant something. The company had to provide a solidly engineered product and satisfy certain QA standards. Not anymore!

The products are increasingly complex and layered, with each layer come certain benefits for many but problems for some. When these "some" have a problem, well, they are not so important to the company because they do not represent the majority of the customers. In addition, due to so many layers, it is becoming impossible for a single group of company employees to be in the position to help the clients (so they don't).

This has been evident in many fields. Consider telecommunications. It used to be that we all had land lines. They were fixed (yes, you could have a wireless handset, I know) but they were reliant on REAL wires and unless the wires were physically down, you had reliable service. Losing or "dropping" a call was unheard of.

Enter cell phones. They are convenient, they were advertised as better and more versatile. And they are, for many. The telecoms realized that cell phones are a much bigger market to be mined and decided to kill off the land line businesses themselves. Why not? You can only have one or two landlines per household but people are known to have multiple cell phones AND these cell phones get upgraded all the time. You can get locked into contracts, charged overage fees etc. This field has unlimited earning potential!

However, consider how many calls "suck" in quality, how many times they get dropped and consider that there are quite a few people who still have poor reception in their own homes. For these people a good, old fashioned landline would have been the solution (and no, not all of us get high speed internet where we live so no, we can't talk over the internet). Cell phone networks are vulnerable, less stable and exist in "thin air", especially compared to their landline cousins. But, nobody cares, right?

Let's consider the cloud, the email and other "personality" driven services where we have a few monopolies like Google or Facebook or Apple. Google's services experience problems non-stop. Sometimes your connection to their mail front-end experiences delays. Sometimes you get an email telling you that they discovered that some of your emails got incorrectly classified as SPAM, other times you get logged out of your web browser window because you have another Google account open in the same browser (with no rhyme or reason why this is happening). So on and so on.

Or take Facebook - sometimes you click on a photo and it just takes forever opening and it eventually does not open. It takes five or six clicks on the photo to maybe get it to display.

How about Amazon? You can request to watch a movie online and sometimes it will just spin and spin and spin and after an excruciating delay it will tell you that it cannot load the video (clearly a problem on their side). Sometimes you cannot get the video to load for a while and then all of a sudden you can.

Most of these services are reliable most of the time. However, none of them are reliable ALL the time. They "work", kind of, for, well, most people. But when they don't work, it is a) difficult to figure out why and b) you are just a drop in the ocean and Google honestly doesn't care. Even if you pay them $25/month per user on a Business Google App account.

Take Apple. You can spend $2000 on a Macbook and extra $700 on an iPhone 5S and still have no simple way to move a photo you took on the phone to the new Mac laptop without involving a cord or the Internet or a large app like iTunes. As someone told me recently, I may just be too "old school" and I may not understand what Apple is trying to do but: a) both their devices offer Bluetooth (but cannot be paired for some reason), b) both offer Airdrop but it only works iPhone to iPhone or laptop to laptop (retarded, right?). However, most Apple people just use a) a cord, b) iTunes over wifi or c) the cloud to upload the photo and then an additional round trip to download it to the Mac. And I am dumb? Anyways, the people like me, who think that in the 21st century you should just be able to share stuff between two devices without cords or extra roundtrips or specialized app intermediaries are the minority.

What has happened is that all of these companies have decided to charge you money for a BEST EFFORT approach to providing a service. You pay a subscription or for you pay a monthly fee but so long as things work for the majority of the people majority of the time, things are good. The rest is "washed out", the engineers shrug their shoulders at these things as untraceable anomalies, the sales people get paid anyways and the high level sharks still get their bonuses.

So, to recap: extra complexity, too many layers, nobody cares - the small fish are a wash, they are still making their money (even more) and YOU are the donkey.

The way of doing things (or should I say the Apple way of doing things)?

Recently I procured an Apple Macbook Air through work, in addition to a nice Apple Display and an iPhone 5S. Nice and EXPENSIVE pieces of equipment!

 In any case, both the Mac and the iPhone advertise bluetooth. However, you cannot pair the two devices. Both advertise Airdrop but it turns out that you cannot use Airdrop to "drop" files between an iOS and a Mavericks devices, it only works iOS to iOS or Mavericks to Mavericks.

I have an Apple fanboi colleague at work who has drank the Apple coolaid, he had worked for Apple or supported Apple devices for the last 10 years and has a lot of shares of Apple stock. His response was that for me Apple sucks because I am used to doing things the "old school way". Apparently I am too old and retarded to use the new stuff.

His final suggestion, however, was iTunes which will sync via wi-fi. I told him that I do not want to use iTunes, I don't like the app. Then he suggested the Dropbox route. I said, "look, I just want to take ONE PHOTO and move it form the iPhone to the Mac and I don't want to involve the Cloud". I am still waiting for a reply...

Addendum: the reply arrived. I have been told to return the phone since for me (I am so special, right?) Apple tech will not work (like it does for the 100s of millions of people). So, we are back to blaming the user, no?