Re: add an auto_increment column

2022-02-06 Thread Siva Samraj
Monotonically_increasing_id() will give the same functionality

On Mon, 7 Feb, 2022, 6:57 am ,  wrote:

> For a dataframe object, how to add a column who is auto_increment like
> mysql's behavior?
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark - ElasticSearch Integration

2021-11-22 Thread Siva Samraj
Hi All,

I want to write a Spark Streaming Job from Kafka to Elasticsearch. Here I
want to detect the schema dynamically while reading it from Kafka.

Can you help me to do that.?

I know, this can be done in Spark Batch Processing via the below line.

val schema =
spark.read.json(dfKafkaPayload.select("value").as[String]).schema

But while executing the same via Spark Streaming Job, we cannot do the
above since streaming can have only on Action.

Please let me know.


Thanks

Siva


Re: Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Jainshasha,

I need to read each row from Dataframe and made some changes to it before
inserting it into ES.

Thanks
Siva

On Mon, Oct 5, 2020 at 8:06 PM jainshasha  wrote:

> Hi Siva
>
> To emit data into ES using spark structured streaming job you need to used
> ElasticSearch jar which has support for sink for spark structured streaming
> job. For this you can use this one my branch where we have integrated ES
> with spark 3.0 and scala 2.12 compatible
> https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0
>
> Also in this you need to build three jars
> elasticsearch-hadoop-sql
> elasticsearch-hadoop-core
> elasticsearch-hadoop-mr
> which help in writing data into ES through spark structured streaming.
>
> And in your application job u can use this way to sink the data, remember
> with ES there is only support of append mode of structured streaming.
> val esDf = aggregatedDF
> .writeStream
> .outputMode("append")
> .format("org.elasticsearch.spark.sql")
> .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
> .start("aggregation-job-index-latest-1")
>
>
> Let me know if you face any issues, will be happy to help you :)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Team,

I have a spark streaming job, which will read from kafka and write into
elastic via Http request.

I want to validate each request from Kafka and change the payload as per
business need and write into Elastic Search.

I have used ES Http Request to push the data into Elastic Search. Can some
guide me how to write the data into ES via a data frame?

*Code Snippet: *
 val dfInput = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("group.id", sourceTopicGroupId)
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

import spark.implicits._

val resultDf = dfInput
  .withColumn("value", $"value".cast("string"))
  .select("value")

resultDf.writeStream.foreach(new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex,
msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
  }

  override def close(errorOrNull: Throwable): Unit = {
  }

}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination()
//"1 second"
  }

Please suggest, is there any approach.

Thanks


Offset Management in Spark

2020-09-30 Thread Siva Samraj
Hi all,

I am using Spark Structured Streaming (Version 2.3.2). I need to read from
Kafka Cluster and write into Kerberized Kafka.
Here I want to use Kafka as offset checkpointing after the record is
written into Kerberized Kafka.

Questions:

1. Can we use Kafka for checkpointing to manage offset or do we need to use
only HDFS/S3 only?

Please help.

Thanks


Re: Spark structural streaming sinks output late

2020-03-28 Thread Siva Samraj
Yes, I am also facing the same issue. Did you figured out?

On Tue, 9 Jul 2019, 7:25 pm Kamalanathan Venkatesan, <
kamalanatha...@in.ey.com> wrote:

> Hello,
>
>
>
> I have below spark structural streaming code and I was expecting the
> results to be printed on the console every 10 seconds. But, I notice the
> sink to console happening every ~2 mins and above.
>
> What could be the issue
>
>
>
> *def* streaming(): Unit = {
>
> System.setProperty("hadoop.home.dir", "/Documents/ ")
>
> *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram").
> setMaster("local[8]")
>
> conf.set("spark.eventLog.enabled", "false");
>
> *val* sc: SparkContext = *new* SparkContext(conf)
>
> *val* sqlcontext = *new* SQLContext(sc)
>
> *val* spark = SparkSession.builder().config(conf).getOrCreate()
>
>
>
> *import* sqlcontext.implicits._
>
> *import* org.apache.spark.sql.functions.window
>
>
>
> *val* inputDf = spark.readStream.format("kafka")
>
>   .option("kafka.bootstrap.servers", "localhost:9092")
>
>   .option("subscribe", "wonderful")
>
>   .option("startingOffsets", "latest")
>
>   .load()
>
> *import* scala.concurrent.duration._
>
>
>
> *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value
> AS STRING)", "timestamp")
>
>   .withWatermark("timestamp", "500 milliseconds")
>
>   .groupBy(
>
> window(*$**"timestamp"*, "10 seconds")).count()
>
>
>
> *val* consoleOutput = personJsonDf.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .option("truncate", "false")
>
>   .outputMode(OutputMode.Update())
>
>   .start()
>
> consoleOutput.awaitTermination()
>
>   }
>
>
>
> *object* SparkExecutor {
>
>   *val* spE: SparkExecutor = *new* SparkExecutor();
>
>   *def* main(args: Array[*String*]): Unit = {
>
> println("test")
>
> spE.streaming
>
>   }
>
> }
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>


Spark Streaming Code

2020-03-28 Thread Siva Samraj
Hi Team,

Need help on windowing & watermark concept.  This code is not working as
expected.

package com.jiomoney.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.ProcessingTime

object SlingStreaming {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("Coupons_ViewingNow")
  .getOrCreate()

import spark.implicits._

val checkpoint_path = "/opt/checkpoints/"

val ks = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "16777216")
  .load()

val dfDeviceid = ks
  .withColumn("val", ($"value").cast("string"))
  .withColumn("count1", get_json_object(($"val"), "$.a"))
  .withColumn("deviceId", get_json_object(($"val"), "$.b"))
  .withColumn("timestamp", current_timestamp())


val final_ids = dfDeviceid
  .withColumn("processing_time", current_timestamp())
  .withWatermark("processing_time","1 minutes")
  .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
  .agg(sum($"count1") as "total")

val t = final_ids
  .select(to_json(struct($"*")) as "value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "sub_topic")
  .option("checkpointLocation", checkpoint_path)
  .outputMode("append")
  .trigger(ProcessingTime("1 seconds"))
  .start()

t.awaitTermination()

  }

}


Thanks


Re: Spark Streaming

2018-11-26 Thread Siva Samraj
My joindf is taking 14 sec in the first run and i have commented out the
withcolumn still it is taking more time.



On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim  wrote:

> You may need to put efforts on triage how much time is spent on each part.
> Without such information you are only able to get general tips and tricks.
> Please check SQL tab and see DAG graph as well as details (logical plan,
> physical plan) to see whether you're happy about these plans.
>
> General tip on quick look of query: avoid using withColumn repeatedly and
> try to put them in one select statement. If I'm not mistaken, it is known
> as a bit costly since each call would produce a new Dataset. Defining
> schema and using "from_json" will eliminate all the call of withColumn"s"
> and extra calls of "get_json_object".
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:
>
>> Hello All,
>>
>> I am using Spark 2.3 version and i am trying to write Spark Streaming
>> Join. It is a basic join and it is taking more time to join the stream
>> data. I am not sure any configuration we need to set on Spark.
>>
>> Code:
>> *
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>>
>> object OrderSalesJoin {
>>   def main(args: Array[String]): Unit = {
>>
>> setEnvironmentVariables(args(0))
>>
>> val order_topic = args(1)
>> val invoice_topic = args(2)
>> val dest_topic_name = args(3)
>>
>> val spark =
>> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>>
>> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>>
>> import spark.implicits._
>>
>>
>> val order_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", order_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>>
>>
>> val invoice_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", invoice_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>>
>>
>> val order_details = order_df
>>   .withColumn("s_order_id", get_json_object($"value".cast("String"),
>> "$.order_id"))
>>   .withColumn("s_customer_id",
>> get_json_object($"value".cast("String"), "$.customer_id"))
>>   .withColumn("s_promotion_id",
>> get_json_object($"value".cast("String"), "$.promotion_id"))
>>   .withColumn("s_store_id", get_json_object($"value".cast("String"),
>> "$.store_id"))
>>   .withColumn("s_product_id",
>> get_json_object($"value".cast("String"), "$.product_id"))
>>   .withColumn("s_warehouse_id",
>> get_json_object($"value".cast("String"), "$.warehouse_id"))
>>   .withColumn("unit_cost", get_json_object($"value".cast("String"),
>> "$.unit_cost"))
>>   .withColumn("total_cost", get_json_object($"value".cast("String"),
>> "$.total_cost"))
>>   .withColumn("units_sold", get_json_object($"value".cast("String"),
>> "$.units_sold"))
>>   .withColumn("promotion_cost",
>> get_json_object($"value".cast("String"), "$.promotion_cost"))
>>   .withColumn("date_of_order",
>> get_json_object($"value".cast("String"), "$.date_of_order"))
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
>> "MMddHHmmss").cast(TimestampTyp

Spark Streaming

2018-11-26 Thread Siva Samraj
Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join.
It is a basic join and it is taking more time to join the stream data. I am
not sure any configuration we need to set on Spark.

Code:
*
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {

setEnvironmentVariables(args(0))

val order_topic = args(1)
val invoice_topic = args(2)
val dest_topic_name = args(3)

val spark =
SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()

val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

import spark.implicits._


val order_df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", order_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")
  .load()


val invoice_df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", invoice_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")
  .load()


val order_details = order_df
  .withColumn("s_order_id", get_json_object($"value".cast("String"),
"$.order_id"))
  .withColumn("s_customer_id", get_json_object($"value".cast("String"),
"$.customer_id"))
  .withColumn("s_promotion_id",
get_json_object($"value".cast("String"), "$.promotion_id"))
  .withColumn("s_store_id", get_json_object($"value".cast("String"),
"$.store_id"))
  .withColumn("s_product_id", get_json_object($"value".cast("String"),
"$.product_id"))
  .withColumn("s_warehouse_id",
get_json_object($"value".cast("String"), "$.warehouse_id"))
  .withColumn("unit_cost", get_json_object($"value".cast("String"),
"$.unit_cost"))
  .withColumn("total_cost", get_json_object($"value".cast("String"),
"$.total_cost"))
  .withColumn("units_sold", get_json_object($"value".cast("String"),
"$.units_sold"))
  .withColumn("promotion_cost",
get_json_object($"value".cast("String"), "$.promotion_cost"))
  .withColumn("date_of_order", get_json_object($"value".cast("String"),
"$.date_of_order"))
  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"MMddHHmmss").cast(TimestampType))
  .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
$"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost",
$"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP",
$"units_sold".cast("integer") as "units_sold")


val invoice_details = invoice_df
  .withColumn("order_id", get_json_object($"value".cast("String"),
"$.order_id"))
  .withColumn("invoice_status",
get_json_object($"value".cast("String"), "$.invoice_status"))
  .where($"invoice_status" === "Success")

  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"MMddHHmmss").cast(TimestampType))



val order_wm = order_details.withWatermark("tstamp_trans", args(4))
val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") ===
invoice_wm.col("order_id"))
  .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
$"s_product_id",
$"s_warehouse_id", $"unit_cost", $"total_cost",
$"promotion_cost",
$"date_of_order",
$"units_sold" as "units_sold", $"order_id")

val final_ids = join_df
  .withColumn("value", to_json(struct($"s_customer_id",
$"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost",
$"total_cost".cast("Int") as "total_cost",
$"promotion_cost".cast("Int") as "promotion_cost",
$"date_of_order",
$"units_sold".cast("Int") as "units_sold", $"order_id")))
  .dropDuplicates("order_id")
  .select("value")


val write_df = final_ids
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("topic", dest_topic_name)
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.ProcessingTime("1 second"))
  .start()

write_df.awaitTermination()

  }

}


Let me know, it is taking more than a minute for every run. The waiting
time is keep on increasing as the data grows.

Please let me know, any thing we need to configure to make it fast. I tried

Re: Not able to overwrite cassandra table using Spark

2018-06-27 Thread Siva Samraj
You can try with this, it will work

val finaldf = merchantdf.write.

  format("org.apache.spark.sql.cassandra")

  .mode(SaveMode.Overwrite)

  .option("confirm.truncate", true)

  .options(Map("table" -> "tablename", "keyspace" -> "keyspace"))

  .save()


On Wed 27 Jun, 2018, 11:15 PM Abhijeet Kumar, 
wrote:

> Hello Team,
>
> I’m creating a dataframe out of cassandra table using datastax spark
> connector. After making some modification into the dataframe, I’m trying to
> put that dataframe back to the Cassandra table by overwriting the old
> content. For that the piece of code is:
>
> modifiedList.write.format("org.apache.spark.sql.cassandra")
>   .options(Map( "table" -> "list", "keyspace" -> "journaling",
> "confirm.truncate" -> "true"))
>   .mode(Overwrite).save
>
> It’s not showing any error and seems like working fine, but when I’m
> checking the Cassandra table back, there is no content inside it.
> Everything is deleted. I’m really worried about this behaviour because this
> may delete some useful content (I’m sure about overwriting the content and
> fully understand the consequences).
>
> Thanks,
> Abhijeet Kumar
>