Re: add an auto_increment column

2022-02-06 Thread Siva Samraj
Monotonically_increasing_id() will give the same functionality

On Mon, 7 Feb, 2022, 6:57 am ,  wrote:

> For a dataframe object, how to add a column who is auto_increment like
> mysql's behavior?
> Thank you.
> -
> To unsubscribe e-mail:

Spark - ElasticSearch Integration

2021-11-22 Thread Siva Samraj
Hi All,

I want to write a Spark Streaming Job from Kafka to Elasticsearch. Here I
want to detect the schema dynamically while reading it from Kafka.

Can you help me to do that.?

I know, this can be done in Spark Batch Processing via the below line.

val schema ="value").as[String]).schema

But while executing the same via Spark Streaming Job, we cannot do the
above since streaming can have only on Action.

Please let me know.



Re: Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Jainshasha,

I need to read each row from Dataframe and made some changes to it before
inserting it into ES.


On Mon, Oct 5, 2020 at 8:06 PM jainshasha  wrote:

> Hi Siva
> To emit data into ES using spark structured streaming job you need to used
> ElasticSearch jar which has support for sink for spark structured streaming
> job. For this you can use this one my branch where we have integrated ES
> with spark 3.0 and scala 2.12 compatible
> Also in this you need to build three jars
> elasticsearch-hadoop-sql
> elasticsearch-hadoop-core
> elasticsearch-hadoop-mr
> which help in writing data into ES through spark structured streaming.
> And in your application job u can use this way to sink the data, remember
> with ES there is only support of append mode of structured streaming.
> val esDf = aggregatedDF
> .writeStream
> .outputMode("append")
> .format("org.elasticsearch.spark.sql")
> .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
> .start("aggregation-job-index-latest-1")
> Let me know if you face any issues, will be happy to help you :)
> --
> Sent from:
> -
> To unsubscribe e-mail:

Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Team,

I have a spark streaming job, which will read from kafka and write into
elastic via Http request.

I want to validate each request from Kafka and change the payload as per
business need and write into Elastic Search.

I have used ES Http Request to push the data into Elastic Search. Can some
guide me how to write the data into ES via a data frame?

*Code Snippet: *
 val dfInput = spark
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("", sourceTopicGroupId)
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)

import spark.implicits._

val resultDf = dfInput
  .withColumn("value", $"value".cast("string"))

resultDf.writeStream.foreach(new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex,
msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)

  override def close(errorOrNull: Throwable): Unit = {

//"1 second"

Please suggest, is there any approach.


Offset Management in Spark

2020-09-30 Thread Siva Samraj
Hi all,

I am using Spark Structured Streaming (Version 2.3.2). I need to read from
Kafka Cluster and write into Kerberized Kafka.
Here I want to use Kafka as offset checkpointing after the record is
written into Kerberized Kafka.


1. Can we use Kafka for checkpointing to manage offset or do we need to use
only HDFS/S3 only?

Please help.


Re: Spark structural streaming sinks output late

2020-03-28 Thread Siva Samraj
Yes, I am also facing the same issue. Did you figured out?

On Tue, 9 Jul 2019, 7:25 pm Kamalanathan Venkatesan, <> wrote:

> Hello,
> I have below spark structural streaming code and I was expecting the
> results to be printed on the console every 10 seconds. But, I notice the
> sink to console happening every ~2 mins and above.
> What could be the issue
> *def* streaming(): Unit = {
> System.setProperty("hadoop.home.dir", "/Documents/ ")
> *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram").
> setMaster("local[8]")
> conf.set("spark.eventLog.enabled", "false");
> *val* sc: SparkContext = *new* SparkContext(conf)
> *val* sqlcontext = *new* SQLContext(sc)
> *val* spark = SparkSession.builder().config(conf).getOrCreate()
> *import* sqlcontext.implicits._
> *import* org.apache.spark.sql.functions.window
> *val* inputDf = spark.readStream.format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "wonderful")
>   .option("startingOffsets", "latest")
>   .load()
> *import* scala.concurrent.duration._
> *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value
> AS STRING)", "timestamp")
>   .withWatermark("timestamp", "500 milliseconds")
>   .groupBy(
> window(*$**"timestamp"*, "10 seconds")).count()
> *val* consoleOutput = personJsonDf.writeStream
>   .outputMode("complete")
>   .format("console")
>   .option("truncate", "false")
>   .outputMode(OutputMode.Update())
>   .start()
> consoleOutput.awaitTermination()
>   }
> *object* SparkExecutor {
>   *val* spE: SparkExecutor = *new* SparkExecutor();
>   *def* main(args: Array[*String*]): Unit = {
> println("test")
> spE.streaming
>   }
> }
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.

Spark Streaming Code

2020-03-28 Thread Siva Samraj
Hi Team,

Need help on windowing & watermark concept.  This code is not working as

package com.jiomoney.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.ProcessingTime

object SlingStreaming {
  def main(args: Array[String]): Unit = {
val spark = SparkSession

import spark.implicits._

val checkpoint_path = "/opt/checkpoints/"

val ks = spark
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "16777216")

val dfDeviceid = ks
  .withColumn("val", ($"value").cast("string"))
  .withColumn("count1", get_json_object(($"val"), "$.a"))
  .withColumn("deviceId", get_json_object(($"val"), "$.b"))
  .withColumn("timestamp", current_timestamp())

val final_ids = dfDeviceid
  .withColumn("processing_time", current_timestamp())
  .withWatermark("processing_time","1 minutes")
  .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
  .agg(sum($"count1") as "total")

val t = final_ids
  .select(to_json(struct($"*")) as "value")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "sub_topic")
  .option("checkpointLocation", checkpoint_path)
  .trigger(ProcessingTime("1 seconds"))





Re: Spark Streaming

2018-11-26 Thread Siva Samraj
My joindf is taking 14 sec in the first run and i have commented out the
withcolumn still it is taking more time.

On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim  wrote:

> You may need to put efforts on triage how much time is spent on each part.
> Without such information you are only able to get general tips and tricks.
> Please check SQL tab and see DAG graph as well as details (logical plan,
> physical plan) to see whether you're happy about these plans.
> General tip on quick look of query: avoid using withColumn repeatedly and
> try to put them in one select statement. If I'm not mistaken, it is known
> as a bit costly since each call would produce a new Dataset. Defining
> schema and using "from_json" will eliminate all the call of withColumn"s"
> and extra calls of "get_json_object".
> - Jungtaek Lim (HeartSaVioR)
> 2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:
>> Hello All,
>> I am using Spark 2.3 version and i am trying to write Spark Streaming
>> Join. It is a basic join and it is taking more time to join the stream
>> data. I am not sure any configuration we need to set on Spark.
>> Code:
>> *
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>> object OrderSalesJoin {
>>   def main(args: Array[String]): Unit = {
>> setEnvironmentVariables(args(0))
>> val order_topic = args(1)
>> val invoice_topic = args(2)
>> val dest_topic_name = args(3)
>> val spark =
>> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>> import spark.implicits._
>> val order_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", order_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>> val invoice_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", invoice_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>> val order_details = order_df
>>   .withColumn("s_order_id", get_json_object($"value".cast("String"),
>> "$.order_id"))
>>   .withColumn("s_customer_id",
>> get_json_object($"value".cast("String"), "$.customer_id"))
>>   .withColumn("s_promotion_id",
>> get_json_object($"value".cast("String"), "$.promotion_id"))
>>   .withColumn("s_store_id", get_json_object($"value".cast("String"),
>> "$.store_id"))
>>   .withColumn("s_product_id",
>> get_json_object($"value".cast("String"), "$.product_id"))
>>   .withColumn("s_warehouse_id",
>> get_json_object($"value".cast("String"), "$.warehouse_id"))
>>   .withColumn("unit_cost", get_json_object($"value".cast("String"),
>> "$.unit_cost"))
>>   .withColumn("total_cost", get_json_object($"value".cast("String"),
>> "$.total_cost"))
>>   .withColumn("units_sold", get_json_object($"value".cast("String"),
>> "$.units_sold"))
>>   .withColumn("promotion_cost",
>> get_json_object($"value".cast("String"), "$.promotion_cost"))
>>   .withColumn("date_of_order",
>> get_json_object($"value".cast("String"), "$.date_of_order"))
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
>> "MMddHHmmss").cast(Ti

Spark Streaming

2018-11-26 Thread Siva Samraj
Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join.
It is a basic join and it is taking more time to join the stream data. I am
not sure any configuration we need to set on Spark.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {


val order_topic = args(1)
val invoice_topic = args(2)
val dest_topic_name = args(3)

val spark =

val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

import spark.implicits._

val order_df = spark
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", order_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")

val invoice_df = spark
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", invoice_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")

val order_details = order_df
  .withColumn("s_order_id", get_json_object($"value".cast("String"),
  .withColumn("s_customer_id", get_json_object($"value".cast("String"),
get_json_object($"value".cast("String"), "$.promotion_id"))
  .withColumn("s_store_id", get_json_object($"value".cast("String"),
  .withColumn("s_product_id", get_json_object($"value".cast("String"),
get_json_object($"value".cast("String"), "$.warehouse_id"))
  .withColumn("unit_cost", get_json_object($"value".cast("String"),
  .withColumn("total_cost", get_json_object($"value".cast("String"),
  .withColumn("units_sold", get_json_object($"value".cast("String"),
get_json_object($"value".cast("String"), "$.promotion_cost"))
  .withColumn("date_of_order", get_json_object($"value".cast("String"),
  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
  .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
$"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost",
$"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP",
$"units_sold".cast("integer") as "units_sold")

val invoice_details = invoice_df
  .withColumn("order_id", get_json_object($"value".cast("String"),
get_json_object($"value".cast("String"), "$.invoice_status"))
  .where($"invoice_status" === "Success")

  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",

val order_wm = order_details.withWatermark("tstamp_trans", args(4))
val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") ===
  .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
$"s_warehouse_id", $"unit_cost", $"total_cost",
$"units_sold" as "units_sold", $"order_id")

val final_ids = join_df
  .withColumn("value", to_json(struct($"s_customer_id",
$"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost",
$"total_cost".cast("Int") as "total_cost",
$"promotion_cost".cast("Int") as "promotion_cost",
$"units_sold".cast("Int") as "units_sold", $"order_id")))

val write_df = final_ids
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("topic", dest_topic_name)
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.ProcessingTime("1 second"))




Let me know, it is taking more than a minute for every run. The waiting
time is keep on increasing as the data grows.

Please let me know, any thing we need to configure to make it fast. I tried

Re: Not able to overwrite cassandra table using Spark

2018-06-27 Thread Siva Samraj
You can try with this, it will work

val finaldf = merchantdf.write.



  .option("confirm.truncate", true)

  .options(Map("table" -> "tablename", "keyspace" -> "keyspace"))


On Wed 27 Jun, 2018, 11:15 PM Abhijeet Kumar, 

> Hello Team,
> I’m creating a dataframe out of cassandra table using datastax spark
> connector. After making some modification into the dataframe, I’m trying to
> put that dataframe back to the Cassandra table by overwriting the old
> content. For that the piece of code is:
> modifiedList.write.format("org.apache.spark.sql.cassandra")
>   .options(Map( "table" -> "list", "keyspace" -> "journaling",
> "confirm.truncate" -> "true"))
>   .mode(Overwrite).save
> It’s not showing any error and seems like working fine, but when I’m
> checking the Cassandra table back, there is no content inside it.
> Everything is deleted. I’m really worried about this behaviour because this
> may delete some useful content (I’m sure about overwriting the content and
> fully understand the consequences).
> Thanks,
> Abhijeet Kumar