Saturday, March 9, 2019

Spark Memory Calculator from available cluster

Suppose I have a Cluster configuration as below:

clusterMember|RAM|noofCores
c01|64|16
c02|64|16
c03|64|16
c04|64|16
c05|64|16
c06|64|16
c07|64|16

The above is pipe seperated file. If you are wanted to calculate Spark Memory Calculations.
Like Executor Memory after excluding all the no.of cores to application master, yarn, . etc., the below code will suggest user the maximum executor memory.

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.HashMap;

public class SparkMemoryCalculator {

 public static final Integer OS_RAM_SIZE = 1;
 public static final Integer OS_CORES = 1;
 public static final Integer OPTIMAL_NO_OF_TASKS = 5;  //1 task per 1 core, for best hdfs throughput 5 tasks
 public static final Integer EXECUTOR_FOR_APPLICATION_MASTER = 1;   //1 core reserved for YARN application master.

 public static void main(String[] args) {

  HashMap<String, HashMap<String, Integer>> clusterMap = setupCluster();
  ArrayList<Integer> availableRamList = new ArrayList<Integer>();
  ArrayList<Integer> availableCoreList = new ArrayList<Integer>();

  System.out.println("Cluster Size : " + clusterMap.size() + " nodes.");

  for (int i = 0; i < clusterMap.size(); i++) {
   if(i != 0) {
   HashMap<String, Integer> testMap = clusterMap.get("node" + i);
   Integer avblRAMperNode = testMap.get("ramSize" + i) - OS_RAM_SIZE;
   Integer numberOfCoresperNode = testMap.get("noOfCores" + i) - OS_CORES;
   availableRamList.add(avblRAMperNode);
   availableCoreList.add(numberOfCoresperNode);
   }
  }

  Integer ramTotal = availableRamList.stream().mapToInt(a -> a).sum();
  Integer totalCores = availableCoreList.stream().mapToInt(a -> a).sum();

  System.out.println("Available Total RAM : "+ramTotal + " gB." +"\nAvailable Total Cores : " + totalCores);
  Integer avbleExecs = availableExecutorsIncluster(totalCores);

  System.out.println("Total Number of available Executors for user usage : " + avbleExecs);
  
  
  BigDecimal v1 = new BigDecimal(avbleExecs);
  BigDecimal v2 = new BigDecimal(clusterMap.size());
  //System.out.println(v1.divide(v2, 2, RoundingMode.HALF_UP).toPlainString());
  
  BigDecimal noOfexecsPerNode = v1.divide(v2, 2, RoundingMode.HALF_UP);
  System.out.println("Total Number of Executors per node are : " + Math.round(noOfexecsPerNode.floatValue()));

  Integer availableMemoryPerExecutor = ramTotal / avbleExecs;
  System.out.println("Availbale Memory per Executor : " + availableMemoryPerExecutor + " gB.");
  BigDecimal memOverHead = memoryOverhead(availableMemoryPerExecutor);
  
  //  Long L = Math.round(memOverHead);
  //  int mo = Integer.valueOf(L.intValue());
  
  System.out.println("Memory OverHead : "+ memOverHead.floatValue());
  System.out.println("round : "+ Math.ceil(memOverHead.floatValue()));
  
  BigDecimal finalMemoryPerExecutor = BigDecimal.valueOf(availableMemoryPerExecutor).subtract(BigDecimal.valueOf(Math.ceil(memOverHead.floatValue())));

  System.out.println("SparkExecutor Memory after memoryOverhead's will between be : \n" + finalMemoryPerExecutor.subtract(BigDecimal.valueOf(1)) +" ~ "+ finalMemoryPerExecutor + " gB.");

 }

 private static BigDecimal memoryOverhead(Integer availableMemoryPerExecutor) {
  Double memOverHead = 0.07 * availableMemoryPerExecutor;
  if (memOverHead > 0.384)
   return BigDecimal.valueOf(memOverHead);
  else
   return BigDecimal.valueOf(0.384);
 }

 private static Integer availableExecutorsIncluster(Integer totalCores) {
  int noOfExecutors = totalCores / OPTIMAL_NO_OF_TASKS;
  return noOfExecutors-EXECUTOR_FOR_APPLICATION_MASTER;
  
 }

 private static HashMap<String, HashMap<String, Integer>> setupCluster() {
  String csvFile = "D:\\Work\\Documents\\Work\\10node_cluster.csv";
  BufferedReader br = null;
  String line = "";
  String cvsSplitBy = "\\|";
  //int counter = 0;
  HashMap<String, HashMap<String, Integer>> nodeMap = new HashMap<String, HashMap<String, Integer>>();
  try {
   br = new BufferedReader(new FileReader(csvFile));
   for(int i=0; (line = br.readLine()) != null; i++) {
    if(i != 0) {  //skipping first row of the input file
    String[] dataArray = line.split(cvsSplitBy);
    String nodeName = "node" + i;
    HashMap<String, Integer> configMap = new HashMap<String, Integer>();
    String ramSize = dataArray[1];
    Double intRam = Double.valueOf(ramSize);
    Long L = Math.round(intRam);
    int mo = Integer.valueOf(L.intValue());
    configMap.put("ramSize" + i, mo);
    String noOfCores = dataArray[2];
    Integer intnoOfCores = Integer.valueOf(noOfCores);
    configMap.put("noOfCores" + i, intnoOfCores);
    nodeMap.put(nodeName, configMap);
    }
   }
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  } finally {
   if (br != null) {
    try {
     br.close();
    } catch (IOException e2) {
     e2.printStackTrace();
    }
   }
  }
  return nodeMap;
 }


}

Friday, March 8, 2019

Connecting Spark to MySql Sources in remote through ssh channel

Add this to your POM.xml
  <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>5.1.47</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/com.jcraft/jsch -->
  <dependency>
   <groupId>com.jcraft</groupId>
   <artifactId>jsch</artifactId>
   <version>0.1.51</version>
  </dependency>


package reading_dbms.mysql

import java.util.Properties
import java.io.FileInputStream
import java.util.HashMap
import org.apache.spark.sql.{ DataFrame, SaveMode, SparkSession }
import org.apache.log4j.{ Level, Logger }
import com.jcraft.jsch.JSch
import com.jcraft.jsch.Session
import java.sql.Connection
import java.sql.DriverManager

object SSHSparkScala {
  def main(args: Array[String]): Unit = {
    val filepath = "/Users/pedam/eclipse-workspace/valid.properties"
    val configs = new Properties()
    configs.load(new FileInputStream(filepath))
    System.setProperty("hadoop.home.dir", configs.getProperty("hadoop.home.dir"))
    System.setProperty("spark.sql.warehouse.dir", configs.getProperty("spark.sql.warehouse.dir"))
    val spark = SparkSession
      .builder()
      .config("spark.some.config.option", "some-value")
      .appName("SSHSparkScala")
      .master("local[*]")
      .getOrCreate()
    val rootLogger = Logger.getRootLogger
    rootLogger.setLevel(Level.ERROR)

    val jdbcConfigs = new Properties()

    var session: Session = null
    val lport = 5656
    val rhost = "10.0.2.15"
    val host = "192.168.56.101"
    val rport = 3306
    val user = "cloudera"
    val password = "cloudera"
    val dbuserName = "root"
    val dbpassword = "cloudera"
    val url = "jdbc:mysql://localhost:" + lport + "/movielens"
    val driverName = "com.mysql.jdbc.Driver"

    jdbcConfigs.put("user", dbuserName)
    jdbcConfigs.put("password", dbpassword)
    
    //jdbcConfigs.put("StrictHostKeyChecking", "no")
    
    configs.put("StrictHostKeyChecking", "no")
    
    val jsch = new JSch()
    session = jsch.getSession(user, host, 22)
    session.setPassword(password)
    session.setConfig(configs)
    session.connect()
    val assinged_port = session.setPortForwardingL(lport, rhost, rport)
    println(session.getHost +"|" + session.getHostKey+"|" + session.getPort+"|" + session.getServerVersion)
    val genre_table = spark.read.jdbc(url, "movielens.ratings", jdbcConfigs)

    genre_table.show(false)

   spark.close()
   session.disconnect()
   System.exit(0)
  }
}

Friday, March 1, 2019

Reading Data from REST Service and processing through Spark-Scala


I have added this below dependency to pom.xml :
<!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api -->
<dependency>
   <groupId>javax.ws.rs</groupId>
   <artifactId>javax.ws.rs-api</artifactId>
   <version>2.1</version>
</dependency>
Spark-Scala
------------
package reading_rest.driver
import java.util.Properties
import java.io.FileInputStream
import javax.ws.rs.core.MediaType
import javax.ws.rs.client.ClientBuilder
import org.apache.spark.sql.{ DataFrame, SaveMode, SparkSession }
import org.apache.log4j.{ Level, Logger }
import scala.collection.immutable.Seq
object ReadingFromRest {
  def main(args: Array[String]): Unit = {
    val filepath = "/Users/pedam/eclipse-workspace/valid.properties";
    val configs = new Properties()
    configs.load(new FileInputStream(filepath))
    System.setProperty("hadoop.home.dir", configs.getProperty("hadoop.home.dir"));
    System.setProperty("spark.sql.warehouse.dir", configs.getProperty("spark.sql.warehouse.dir"));
    val spark = SparkSession
      .builder()
      .appName("Rest_Reading_application")
      .master("local")
      .getOrCreate()
    val rootLogger = Logger.getRootLogger
    rootLogger.setLevel(Level.ERROR)
    rootLogger.setLevel(Level.ERROR)
    
    val jsonStr: String = ClientBuilder.newClient().target("http://dummy.restapiexample.com/api/v1/employees").request()
                        .accept(MediaType.APPLICATION_JSON).get(classOf[String])
    println(jsonStr)
    import spark.implicits._     
    val employeesDF = spark.read.json(Seq(jsonStr).toDS)    
    employeesDF.show()    
    spark.close()
    System.exit(1)
  }
} 
Output:
[{"id":"2027","employee_name":"testmadhan1","employee_salary":"345000","employee_age":"35","profile_image":""},....

+------------+-----------------+---------------+----+-------------+
|employee_age|    employee_name|employee_salary|  id|profile_image|
+------------+-----------------+---------------+----+-------------+
|          35|      testmadhan1|         345000|2027|             |
|          23|       test_gslab|            123|2028|             |
|    23111111|test_gslab1111111|      123111111|2030|             |
|          23|       test_gslab|            123|2032|             |
|          23|       test_gslab|            123|2034|             |
|          23|       test_gslab|            123|2035|             |
|          23|       test_gslab|            123|2036|             |
|          23|       test_gslab|            123|2037|             |
|          23|       test_gslab|            123|2038|             |
|          23|       test_gslab|            123|2043|             |
|          23|       test_gslab|            123|2044|             |

Sunday, February 10, 2019

Creating DDL from Parquet file

import java.io.File;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;

public class ParquetToDDL {
 public static void main(String[] args) {
  System.setProperty("hadoop.home.dir", "C:\\winutils\\");
  File propertiesFile = new File("C:\\Users\\navee\\eclipse-workspace\\valid.properties");
  PropertiesConfiguration configs = new PropertiesConfiguration();
  try {

   configs.load(propertiesFile);

   SparkSession session = SparkSession.builder().appName("fm_spark_data_extractor").master("local[*]")
    .config("spark.sql.warehouse.dir", configs.getString("spark.sql.warehouse.dir"))
    .config("spark.local.dir", configs.getString("spark.local.dir"))
    .config("spark.driver.memory", configs.getString("spark.driver.memory"))
    .config("spark.executor.memory", configs.getString("spark.executor.memory"))
    .config("spark.executor.cores", configs.getInt("spark.executor.cores"))
    .config("spark.dynamicAllocation.enabled", configs.getBoolean("spark.dynamicAllocation.enabled"))
    .config("spark.serializer", configs.getString("spark.serializer")).getOrCreate();

   Dataset < Row > verifydata = session.read().parquet("D:\\userdata1.parquet");

   StringBuilder ddlBuilder = new StringBuilder();
   ddlBuilder.append("CREATE EXTERNAL TABLE if NOT EXISTS `" + "MBR".toLowerCase() + "` ( ");
   for (StructField field: verifydata.schema().fields()) {
    System.out.println(" `" + field.name() + "` " + field.dataType().simpleString().toUpperCase() + ",");
    ddlBuilder.append(" `" + field.name() + "` " + field.dataType().simpleString().toUpperCase() + ", \n");

   }
   ddlBuilder.replace(ddlBuilder.toString().lastIndexOf(','),
    ",".length() + ddlBuilder.toString().lastIndexOf(','), "");
   ddlBuilder.append(" ) ");
   ddlBuilder.append("STORED AS PARQUET LOCATION '" + "hdfslocationPath+tableName" + "/'");
   System.out.println(ddlBuilder.toString());
  } catch (ConfigurationException e) {
   e.printStackTrace();
  }
 }

}

Processing Unix Array Building queries

##if tableNames is a comma seperated string like employees,departments,stuff
IFS=',' read -r -a tableArray <<< "${tableNames}"
echo "${#tableArray[@]}" 
for indx in ${!tableArray[@]}
 do
 tableName=`echo ${tableArray[indx]}| awk '{$1=$1};1'`
 buildAquery=" select distinct INPUT__FILE__NAME FROM ${TARGET_DATABASE}.${tableName} where to_date(ss_date) = '${BUSINESS_DATE}'"
 if [[ ${indx} -lt ${count} ]]
 then
  buildAquery += " union all "
 fi
 pseudoQuery +="${buildAquery}"
done

Wednesday, November 15, 2017

Filtering out Source Error Records from the dataframe and Saving to specific location


The below code reads and json with error/malformed json records in it, and you will be creating a dataframe _corrupt_record column in it, and that errors can be saved to specified location as error path if you want, finally you are returning errorFreeDF to the next required objects

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

class ReadWriteData {

  /*  Class created for reading sources and writing files   */
  def readJsonFile(sparkSession: SparkSession, sourcePath:String ): DataFrame = try {
    import sparkSession.implicits._
    val sourceDataDF = sparkSession.read.option("columnNameOfCorruptRecord", "_corrupt_record").json(sourcePath)
    //sourceDataDF.show()    val errorDataDF = sourceDataDF.select("_corrupt_record").filter($"_corrupt_record".isNotNull)
    errorDataDF.write.mode(SaveMode.Overwrite).json(ConfigFactory.load().getString("data.filesystem.errorpath"))

    val errorFreeDF = sourceDataDF.drop(sourceDataDF.col("_corrupt_record"))
    
    errorFreeDF
  } catch {

    case ex: Exception => {sparkSession.read.json(sourcePath)}

  }

}

Wednesday, June 7, 2017

TwitterLive Data Processing with Spark Scala

This is an Interesting sample:
I added dependency something like the below to my POM.XML
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter_2.10 -->
  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-twitter_2.10</artifactId>
   <version>1.6.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream -->
  <dependency>
   <groupId>org.twitter4j</groupId>
   <artifactId>twitter4j-stream</artifactId>
   <version>4.0.2</version>
  </dependency> 
1.At Run Configurations I have given arguments as below to your spark-scala code, 
2.Please make sure you have twitter account available, on web you find bunch of documents to create twitter-app pull your consumerkeys and accesstokens
3.Having said that, you can replace <consumer key> <consumer secret> <access token> <access token secret> [<filters>] as shown below
4.tulbXXXXXXXXXXXOOOOOOOOOOOW  nrZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXbkxVxpS9 8XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXi5o vXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXMO00aN3 trump
I trying to search on filter name "trump", live tweets and count of tweets, observe the results on screenshots
Code:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.log4j.{ Level, Logger }
object TwitterTagCount {  
  def main(args: Array[String]) {    
    if (args.length < 4) {
      System.err.println("Usage: TwitterTagCount <consumer key> <consumer secret> " +
        "<access token> <access token secret> [<filters>]")
      System.exit(1)
    }
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)//Dstream
    val rootLogger = Logger.getRootLogger()    
    rootLogger.setLevel(Level.ERROR)    
    ssc.checkpoint("checkpoint")
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
    val statuses  =  stream.map ( x => x.getText )
     statuses.print()
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                     .map{case (topic, count) => (count, topic)}
                     .transform(_.sortByKey(false))
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
                     .map{case (topic, count) => (count, topic)}
                     .transform(_.sortByKey(false))
    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })
    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })
    ssc.start()
    ssc.awaitTermination()
  }  
}