Wednesday, November 15, 2017

Filtering out Source Error Records from the dataframe and Saving to specific location


The below code reads and json with error/malformed json records in it, and you will be creating a dataframe _corrupt_record column in it, and that errors can be saved to specified location as error path if you want, finally you are returning errorFreeDF to the next required objects

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

class ReadWriteData {

  /*  Class created for reading sources and writing files   */
  def readJsonFile(sparkSession: SparkSession, sourcePath:String ): DataFrame = try {
    import sparkSession.implicits._
    val sourceDataDF = sparkSession.read.option("columnNameOfCorruptRecord", "_corrupt_record").json(sourcePath)
    //sourceDataDF.show()    val errorDataDF = sourceDataDF.select("_corrupt_record").filter($"_corrupt_record".isNotNull)
    errorDataDF.write.mode(SaveMode.Overwrite).json(ConfigFactory.load().getString("data.filesystem.errorpath"))

    val errorFreeDF = sourceDataDF.drop(sourceDataDF.col("_corrupt_record"))
    
    errorFreeDF
  } catch {

    case ex: Exception => {sparkSession.read.json(sourcePath)}

  }

}

Wednesday, June 7, 2017

TwitterLive Data Processing with Spark Scala

This is an Interesting sample:
I added dependency something like the below to my POM.XML
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter_2.10 -->
  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-twitter_2.10</artifactId>
   <version>1.6.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream -->
  <dependency>
   <groupId>org.twitter4j</groupId>
   <artifactId>twitter4j-stream</artifactId>
   <version>4.0.2</version>
  </dependency> 
1.At Run Configurations I have given arguments as below to your spark-scala code, 
2.Please make sure you have twitter account available, on web you find bunch of documents to create twitter-app pull your consumerkeys and accesstokens
3.Having said that, you can replace <consumer key> <consumer secret> <access token> <access token secret> [<filters>] as shown below
4.tulbXXXXXXXXXXXOOOOOOOOOOOW  nrZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXbkxVxpS9 8XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXi5o vXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXMO00aN3 trump
I trying to search on filter name "trump", live tweets and count of tweets, observe the results on screenshots
Code:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.log4j.{ Level, Logger }
object TwitterTagCount {  
  def main(args: Array[String]) {    
    if (args.length < 4) {
      System.err.println("Usage: TwitterTagCount <consumer key> <consumer secret> " +
        "<access token> <access token secret> [<filters>]")
      System.exit(1)
    }
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)//Dstream
    val rootLogger = Logger.getRootLogger()    
    rootLogger.setLevel(Level.ERROR)    
    ssc.checkpoint("checkpoint")
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
    val statuses  =  stream.map ( x => x.getText )
     statuses.print()
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                     .map{case (topic, count) => (count, topic)}
                     .transform(_.sortByKey(false))
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
                     .map{case (topic, count) => (count, topic)}
                     .transform(_.sortByKey(false))
    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })
    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })
    ssc.start()
    ssc.awaitTermination()
  }  
}




Tuesday, May 30, 2017

KafkaWordCount example Spark-Scala


Install kafka in your box, as described in the kafka.org website.

I have used ubuntu installation process.

Now execute the below commands one by one.
i)/home/naveen/Documents/work/infa/kafka_2.11/bin/zookeeper-server-start.sh /home/naveen/Documents/work/infa/kafka_2.11/config/zookeeper.properties &
ii)cd /home/naveen/Documents/work/infa/kafka_2.11/
iii)bin/kafka-server-start.sh config/server.properties &

iv)/home/naveen/Documents/work/infa/kafka_2.11/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testone &
v)/home/naveen/Documents/work/infa/kafka_2.11/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testone
vi)/home/naveen/Documents/work/infa/kafka_2.11/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testone --from-beginning

From eclipse, I have executed the below code:
package bigdata.sparkapplications
import java.util.HashMap
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.log4j.{ Level, Logger }

object KafkaWordCount {
  def main(args: Array[String]) {
    val zkQuorum = "localhost:2181"
    val group = "zookeeper"
    val topics = "testone"
    val numThreads = "1"
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val rootLogger = Logger.getRootLogger()    
    rootLogger.setLevel(Level.ERROR)    
    ssc.checkpoint("checkpoint")
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 1)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
 
Find the result below, as the count is processed. 
 

 

Monday, March 20, 2017

Spark Scala CSV Parsing, Filtering data with String input.

Downloaded data source data from

https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data

Put it in your desired location, I kept in my hadoop directory

package bigdata.spark.sparkproject

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.{ Level, Logger }

import com.spark.util.Utills
import java.util._

object CSVFileReading {  
  
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("SparkCSVFileProcessing").setMaster("local[2]")    
    val sc = new SparkContext(conf)    
    val rootLogger = Logger.getRootLogger()    
    rootLogger.setLevel(Level.ERROR)      
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    val headers = "age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-county,income".split(",").toList
    val inputDataFileDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true") 
      .load(Utills.DATA_PATH +"adult.data")
      
    val dFRenamed = inputDataFileDF.toDF(headers: _*)     
    // inputDataFileDF.show(5)
     dFRenamed.printSchema() //printing schema     
     dFRenamed.show(5)  //showing the top 5 rows
     //  dfRenamed.filter($"workclass" === "Private").show     
     // dFRenamed.select("education-num","workclass").distinct.show   
     val prvtData = dFRenamed.select("*").where(dFRenamed("workclass").contains("Private"))
     // dfPrivate.show(5)     
     prvtData.show(5)
     
  }
  
}

Friday, March 17, 2017

spark k means clustering example

I used Spark Scala 2.11 version mllib library, and for plotting I used

Put the below on your pom.xml
<!-- https://mvnrepository.com/artifact/com.github.yannrichet/JMathPlot -->
<dependency>
<groupId>com.github.yannrichet</groupId>
<artifactId>JMathPlot</artifactId>
<version>1.0.1</version>
</dependency>
 
Using Spark Scala KMean-Algorithm and Plotting :
Once after these, use below code for plotting them. replace input file path.
Note: The commented part of the code will help you to leverage it, to the files having more than two columns

package bigdata.spark.sparkproject

import org.apache.spark.{ SparkConf, SparkContext }

import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD

import org.apache.spark.mllib.clustering.{ KMeans, KMeansModel }
import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.regression.LabeledPoint

import javax.swing.JFrame
import org.math.plot.Plot2DPanel
import org.apache.log4j.{ Level, Logger }
import java.awt.Color

object KMeanGraphModel {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("KMeansExample").setAppName("Spark-DataFrame").setMaster("local")
    val sc = new SparkContext(conf)

    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)

    val data = sc.textFile("/home/naveen/Documents/work/learning/exercises/mldata/in/kmeans_online.txt")
    val parsedDenseVector = data.map(s => Vectors.dense(s.split(',').map(_.toDouble))).cache()

    val indexedDenseVector = parsedDenseVector.zipWithIndex()
    val indexedData = indexedDenseVector.map { case (value, index) => (index, value) }

    //val rdd1 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.take(2))) }
    //val rdd2 = indexedData.map { case (i, DenseVector(arr)) => (i, Vectors.dense(arr.drop(2))) }

    // parsedDenseVector.foreach(println)
    // indexedData.foreach(println)  

    val rddXextract = indexedData.map { case (x, y) => ((y.toArray(0))) }
    val rddYextract = indexedData.map { case (x, y) => ((y.toArray(1))) }

    //  rdd1.foreach(println)    
    //  rdd2.foreach(println)
    //rddXextract.foreach { println }
    //rddYextract.foreach { println }

    // Cluster the data into two classes using KMeans
    val numClusters = 3
    val numIterations = 10
    val kmmodel = KMeans.train(parsedDenseVector, numClusters, numIterations)

    val centroids = kmmodel.clusterCenters

    val centroidPoints = centroids.map(_.toArray)

    //val rddXextract = indexedData.map{case (x,y) => (( y.toArray(0)))}
    //val rddYextract = indexedData.map{case (x,y) => (( y.toArray(1)))}
    // kmmodel.predict(parsedDenseVector).foreach(println)

    var xplot = rddXextract.collect()
    var yplot = rddYextract.collect()

    println("[" + xplot + ",")
    centroids.foreach { println }

    //var cent = centroids.collect()

    val plt = new Plot2DPanel()

    plt.addLegend("SOUTH")

    plt.addScatterPlot("Data Points", Color.RED, xplot, yplot)
    plt.addScatterPlot("Centroids", Color.BLUE, centroidPoints)

    val frame = new JFrame("KMean plot panel")
    frame.setSize(600, 600)
    frame.setContentPane(plt)
    frame.setVisible(true)

  }

}
 
 

Thursday, February 2, 2017

Spark Scala, MySQL JDBC-Connect Select program


The below program will let you know, how the Spark-Scala object getting connected with MySQL database and saving the contents of the table in a local directory.

Prerequisite: Place a mysql-jdbc connector jar in the build path.


 package bigdata.sparkapplications  
 import org.apache.spark.SparkConf  
 import org.apache.spark.SparkContext  
 import org.apache.spark.SparkContext._  
 import org.apache.spark.rdd.JdbcRDD  
 import java.sql.{DriverManager,Connection,ResultSet}  
   
 object ScalaJdbcConnectSelect {  
  def main(args: Array[String]) {  
   val conf = new SparkConf().setAppName("JDBC RDD").setMaster("local[2]").set("spark.executor.memory", "1g");  
   val sc = new SparkContext(conf);    
   val driver = "com.mysql.jdbc.Driver"  
   val url = "jdbc:mysql://localhost:3306/sakila"  
   val username = "root"  
   val password = "root"    
   Class.forName(driver).newInstance;  
   val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url, username, password),  
     "select first_name, last_name, email from sakila.customer limit ?,?", 1, 50, 1, r =>   
      r.getString("first_name") + ", " + r.getString("last_name") + ", "+r.getString("email"));  
   myRDD.foreach(println);  
   myRDD.saveAsTextFile("/home/naveen/Documents/work/data.txt")    
  }  
 }

WordCount with Spark, Cloudera Hadoop

This is the Wordcount example, executed on Cloudera with Spark!


package bigdata.sparkapplications   
 import org.apache.spark.SparkContext  
 import org.apache.spark.SparkConf  
 import org.apache.spark.SparkContext  
 import org.apache.spark.SparkContext._  
 import org.apache.spark.SparkConf  
   
 object hadooptest {      
   def main(args: Array[String]): Unit = {      
   val conf = new SparkConf().setMaster("local").setAppName("WordCount")  
   val sc = new SparkContext(conf)  
   val data = sc.textFile("hdfs://10.0.0.209:8020/user/cloudera/hduser/in/documentation.txt")  
   val result = data.flatMap(_.split(" ")).map(words =&gt; (words,1)).reduceByKey(_+_)    
   result.collect.foreach(println)  
       
  }  
 }