Machine learning with Spark tutorial

In my previous machine learning post, I described preparing data. Of course, now I realised that I did it wrong. Now I have more data and better way to prepare it. My twitter script has been running almost a year. The data it collects and what it means you could read in the previous post. So, now I have 36 google spreadsheets if tweets. I looked for good tools for data cleaning and found a nice one – OpenRefine, former Google Refine and you can see this by it’s UI.

The best thing is that I could load the whole bunch of XLS files easily. One more pretty thing it’s how I can remove duplicates and clean the text using transformations with syntax similar to Java. Good list of recipes you could find on project WIKI

To give you a hint what I’m trying to reach: After the revolution, Ukrainian twitter has an interesting concept. Loads of tweets are marked with hashtags #зрада and #перемога which mean #betray and #win accordingly. Hard to explain, but it just like labels, when people don’t like what happens, they say “that is betray!” or opposite “It’s epic win!”

So now we have cleaned text with labels. Which looks like this in OpenRefine. Full CSV I pushed to github.screenshot

Let’s create scala sbt project with spark. I guess you already have experience with scala, I’ll just say that there is a nice sbt-plugin called sbt-spark-package which lets you manage spark packages in an easy way. For my project, I’ll need two packages, mllib and sql. spark-core adds automatically.

sparkComponents ++= Seq("mllib", "sql")

Now we are ready to code. Fist of all we need to init spark context and load data:

val logFile = "zp3.csv"
val conf = new SparkConf().setAppName("zrada-peremoga").setMaster("local")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()

val df = spark.read.csv(logFile) // load file to DataFrame

I have no header in my CSV file and model requires the label to be a Double data type. Let’s fix it.

val toDouble = udf[Double, String]( _.toDouble)
val mlData = df.select("_c0", "_c1").toDF("text", "label")
val data_all = mlData.withColumn("label", toDouble(mlData("label"))).select("text", "label").cache()

Next, we need to split sentences into words, remove stop-words which have no meaning, such as “and, a, the…”. I use Naive Bayes classifier, which takes features, represented by numbers, that’s why we need to use hashing. Which represents all our words as hashes and it’s frequencies. I’ll do it as pipeline, so the output from the first transformer should match an input of the next transformer.

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val cleaner = new StopWordsRemover().setInputCol(tokenizer.getOutputCol).setOutputCol("cleaned")
val hashTF = new HashingTF().setInputCol(cleaner.getOutputCol).setOutputCol("features").setNumFeatures(11000)
val naiveBayes = new NaiveBayes().setSmoothing(7)
//Connect transformers to pipeline
val pipeline = new Pipeline().setStages(Array(tokenizer, cleaner, hashTF, naiveBayes))

The most time-consuming thing in machine learning it’s tuning classifiers. You could see some numbers in my code, it’s the numbers which I used to find the best accuracy manually. Luckily, spark has the solution for it. We could use ParamGrid, it matches all possible combinations in given set of params to find the best solution. Runs slowly, but the result is dramatically higher.

val paramgrid = new ParamGridBuilder()
  .addGrid(hashTF.numFeatures, Array(1000, 5000, 10000, 11000, 12000, 13000))
  .addGrid(naiveBayes.smoothing, Array(1.0,2,3,4,5,6,7,8,9,10))
  .build()

Let’s put our paramGrid, and pipeline together.

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramgrid)
  .setNumFolds(4)

Now we could train our model and as you may know, we shouldn’t use all data for training, that’s why spark has a magic way of splitting a dataset.

val splits = data_all.randomSplit(Array(0.7, 0.3))
val (data_training, data_testing) = (splits(0), splits(1))

Now I want to train my model with training data and test my model accuracy wth testing data. To calculate accuracy, I’ll simply count the percent of correct predictions. After that, I’ll save the model for future use.

val pal = cvModel.transform(data_testing)
  .select("prediction", "label")
  .collect()
val accuracy = 1.0 * pal.count(r => r.getDouble(0) == r.getDouble(1)) / data_testing.count()
println("accuracy: " + accuracy)
cvModel.write.overwrite().save("cvModel")

In my case, the best accuracy I got so far is 0.74, it’s not so good, I think the reason is that I have 40% more 0-labels than 1 – labels. Actually, I checked it on real tweets and the result seems pretty interesting.

Full source code and example of usage you could find in my github project