Re: add an auto_increment column
Monotonically_increasing_id() will give the same functionality On Mon, 7 Feb, 2022, 6:57 am , wrote: > For a dataframe object, how to add a column who is auto_increment like > mysql's behavior? > > Thank you. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark - ElasticSearch Integration
Hi All, I want to write a Spark Streaming Job from Kafka to Elasticsearch. Here I want to detect the schema dynamically while reading it from Kafka. Can you help me to do that.? I know, this can be done in Spark Batch Processing via the below line. val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema But while executing the same via Spark Streaming Job, we cannot do the above since streaming can have only on Action. Please let me know. Thanks Siva
Re: Spark Streaming ElasticSearch
Hi Jainshasha, I need to read each row from Dataframe and made some changes to it before inserting it into ES. Thanks Siva On Mon, Oct 5, 2020 at 8:06 PM jainshasha wrote: > Hi Siva > > To emit data into ES using spark structured streaming job you need to used > ElasticSearch jar which has support for sink for spark structured streaming > job. For this you can use this one my branch where we have integrated ES > with spark 3.0 and scala 2.12 compatible > https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0 > > Also in this you need to build three jars > elasticsearch-hadoop-sql > elasticsearch-hadoop-core > elasticsearch-hadoop-mr > which help in writing data into ES through spark structured streaming. > > And in your application job u can use this way to sink the data, remember > with ES there is only support of append mode of structured streaming. > val esDf = aggregatedDF > .writeStream > .outputMode("append") > .format("org.elasticsearch.spark.sql") > .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es") > .start("aggregation-job-index-latest-1") > > > Let me know if you face any issues, will be happy to help you :) > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark Streaming ElasticSearch
Hi Team, I have a spark streaming job, which will read from kafka and write into elastic via Http request. I want to validate each request from Kafka and change the payload as per business need and write into Elastic Search. I have used ES Http Request to push the data into Elastic Search. Can some guide me how to write the data into ES via a data frame? *Code Snippet: * val dfInput = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "latest") .option("group.id", sourceTopicGroupId) .option("failOnDataLoss", "false") .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger) .load() import spark.implicits._ val resultDf = dfInput .withColumn("value", $"value".cast("string")) .select("value") resultDf.writeStream.foreach(new ForeachWriter[Row] { override def open(partitionId: Long, version: Long): Boolean = true override def process(value: Row): Unit = { processEventsData(value.get(0).asInstanceOf[String], deviceIndex, msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark) } override def close(errorOrNull: Throwable): Unit = { } }).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination() //"1 second" } Please suggest, is there any approach. Thanks
Offset Management in Spark
Hi all, I am using Spark Structured Streaming (Version 2.3.2). I need to read from Kafka Cluster and write into Kerberized Kafka. Here I want to use Kafka as offset checkpointing after the record is written into Kerberized Kafka. Questions: 1. Can we use Kafka for checkpointing to manage offset or do we need to use only HDFS/S3 only? Please help. Thanks
Re: Spark structural streaming sinks output late
Yes, I am also facing the same issue. Did you figured out? On Tue, 9 Jul 2019, 7:25 pm Kamalanathan Venkatesan, < kamalanatha...@in.ey.com> wrote: > Hello, > > > > I have below spark structural streaming code and I was expecting the > results to be printed on the console every 10 seconds. But, I notice the > sink to console happening every ~2 mins and above. > > What could be the issue > > > > *def* streaming(): Unit = { > > System.setProperty("hadoop.home.dir", "/Documents/ ") > > *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram"). > setMaster("local[8]") > > conf.set("spark.eventLog.enabled", "false"); > > *val* sc: SparkContext = *new* SparkContext(conf) > > *val* sqlcontext = *new* SQLContext(sc) > > *val* spark = SparkSession.builder().config(conf).getOrCreate() > > > > *import* sqlcontext.implicits._ > > *import* org.apache.spark.sql.functions.window > > > > *val* inputDf = spark.readStream.format("kafka") > > .option("kafka.bootstrap.servers", "localhost:9092") > > .option("subscribe", "wonderful") > > .option("startingOffsets", "latest") > > .load() > > *import* scala.concurrent.duration._ > > > > *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value > AS STRING)", "timestamp") > > .withWatermark("timestamp", "500 milliseconds") > > .groupBy( > > window(*$**"timestamp"*, "10 seconds")).count() > > > > *val* consoleOutput = personJsonDf.writeStream > > .outputMode("complete") > > .format("console") > > .option("truncate", "false") > > .outputMode(OutputMode.Update()) > > .start() > > consoleOutput.awaitTermination() > > } > > > > *object* SparkExecutor { > > *val* spE: SparkExecutor = *new* SparkExecutor(); > > *def* main(args: Array[*String*]): Unit = { > > println("test") > > spE.streaming > > } > > } > > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others > authorized to receive it. It may contain confidential or legally privileged > information. If you are not the intended recipient you are hereby notified > that any disclosure, copying, distribution or taking any action in reliance > on the contents of this information is strictly prohibited and may be > unlawful. If you have received this communication in error, please notify > us immediately by responding to this email and then delete it from your > system. The firm is neither liable for the proper and complete transmission > of the information contained in this communication nor for any delay in its > receipt. >
Spark Streaming Code
Hi Team, Need help on windowing & watermark concept. This code is not working as expected. package com.jiomoney.streaming import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.ProcessingTime object SlingStreaming { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .appName("Coupons_ViewingNow") .getOrCreate() import spark.implicits._ val checkpoint_path = "/opt/checkpoints/" val ks = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .option("kafka.replica.fetch.max.bytes", "16777216") .load() val dfDeviceid = ks .withColumn("val", ($"value").cast("string")) .withColumn("count1", get_json_object(($"val"), "$.a")) .withColumn("deviceId", get_json_object(($"val"), "$.b")) .withColumn("timestamp", current_timestamp()) val final_ids = dfDeviceid .withColumn("processing_time", current_timestamp()) .withWatermark("processing_time","1 minutes") .groupBy(window($"processing_time", "10 seconds"), $"deviceId") .agg(sum($"count1") as "total") val t = final_ids .select(to_json(struct($"*")) as "value") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "sub_topic") .option("checkpointLocation", checkpoint_path) .outputMode("append") .trigger(ProcessingTime("1 seconds")) .start() t.awaitTermination() } } Thanks
Re: Spark Streaming
My joindf is taking 14 sec in the first run and i have commented out the withcolumn still it is taking more time. On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim wrote: > You may need to put efforts on triage how much time is spent on each part. > Without such information you are only able to get general tips and tricks. > Please check SQL tab and see DAG graph as well as details (logical plan, > physical plan) to see whether you're happy about these plans. > > General tip on quick look of query: avoid using withColumn repeatedly and > try to put them in one select statement. If I'm not mistaken, it is known > as a bit costly since each call would produce a new Dataset. Defining > schema and using "from_json" will eliminate all the call of withColumn"s" > and extra calls of "get_json_object". > > - Jungtaek Lim (HeartSaVioR) > > 2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성: > >> Hello All, >> >> I am using Spark 2.3 version and i am trying to write Spark Streaming >> Join. It is a basic join and it is taking more time to join the stream >> data. I am not sure any configuration we need to set on Spark. >> >> Code: >> * >> import org.apache.spark.sql.SparkSession >> import org.apache.spark.sql.functions._ >> import org.apache.spark.sql.streaming.Trigger >> import org.apache.spark.sql.types.TimestampType >> >> object OrderSalesJoin { >> def main(args: Array[String]): Unit = { >> >> setEnvironmentVariables(args(0)) >> >> val order_topic = args(1) >> val invoice_topic = args(2) >> val dest_topic_name = args(3) >> >> val spark = >> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate() >> >> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name >> >> import spark.implicits._ >> >> >> val order_df = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", KAFKA_BROKERS) >> .option("subscribe", order_topic) >> .option("startingOffsets", "latest") >> .option("failOnDataLoss", "false") >> .option("kafka.replica.fetch.max.bytes", "15728640") >> .load() >> >> >> val invoice_df = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", KAFKA_BROKERS) >> .option("subscribe", invoice_topic) >> .option("startingOffsets", "latest") >> .option("failOnDataLoss", "false") >> .option("kafka.replica.fetch.max.bytes", "15728640") >> .load() >> >> >> val order_details = order_df >> .withColumn("s_order_id", get_json_object($"value".cast("String"), >> "$.order_id")) >> .withColumn("s_customer_id", >> get_json_object($"value".cast("String"), "$.customer_id")) >> .withColumn("s_promotion_id", >> get_json_object($"value".cast("String"), "$.promotion_id")) >> .withColumn("s_store_id", get_json_object($"value".cast("String"), >> "$.store_id")) >> .withColumn("s_product_id", >> get_json_object($"value".cast("String"), "$.product_id")) >> .withColumn("s_warehouse_id", >> get_json_object($"value".cast("String"), "$.warehouse_id")) >> .withColumn("unit_cost", get_json_object($"value".cast("String"), >> "$.unit_cost")) >> .withColumn("total_cost", get_json_object($"value".cast("String"), >> "$.total_cost")) >> .withColumn("units_sold", get_json_object($"value".cast("String"), >> "$.units_sold")) >> .withColumn("promotion_cost", >> get_json_object($"value".cast("String"), "$.promotion_cost")) >> .withColumn("date_of_order", >> get_json_object($"value".cast("String"), "$.date_of_order")) >> .withColumn("tstamp_trans", current_timestamp()) >> .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", >> "MMddHHmmss").cast(Ti
Spark Streaming
Hello All, I am using Spark 2.3 version and i am trying to write Spark Streaming Join. It is a basic join and it is taking more time to join the stream data. I am not sure any configuration we need to set on Spark. Code: * import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.TimestampType object OrderSalesJoin { def main(args: Array[String]): Unit = { setEnvironmentVariables(args(0)) val order_topic = args(1) val invoice_topic = args(2) val dest_topic_name = args(3) val spark = SparkSession.builder().appName("SalesStreamingJoin").getOrCreate() val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name import spark.implicits._ val order_df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", KAFKA_BROKERS) .option("subscribe", order_topic) .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .option("kafka.replica.fetch.max.bytes", "15728640") .load() val invoice_df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", KAFKA_BROKERS) .option("subscribe", invoice_topic) .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .option("kafka.replica.fetch.max.bytes", "15728640") .load() val order_details = order_df .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id")) .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id")) .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id")) .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id")) .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id")) .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id")) .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost")) .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost")) .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold")) .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost")) .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order")) .withColumn("tstamp_trans", current_timestamp()) .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "MMddHHmmss").cast(TimestampType)) .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id", $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost", $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost", $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold") val invoice_details = invoice_df .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id")) .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status")) .where($"invoice_status" === "Success") .withColumn("tstamp_trans", current_timestamp()) .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "MMddHHmmss").cast(TimestampType)) val order_wm = order_details.withWatermark("tstamp_trans", args(4)) val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5)) val join_df = order_wm .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id")) .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id", $"s_warehouse_id", $"unit_cost", $"total_cost", $"promotion_cost", $"date_of_order", $"units_sold" as "units_sold", $"order_id") val final_ids = join_df .withColumn("value", to_json(struct($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id", $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost", $"total_cost".cast("Int") as "total_cost", $"promotion_cost".cast("Int") as "promotion_cost", $"date_of_order", $"units_sold".cast("Int") as "units_sold", $"order_id"))) .dropDuplicates("order_id") .select("value") val write_df = final_ids .writeStream .format("kafka") .option("kafka.bootstrap.servers", KAFKA_BROKERS) .option("topic", dest_topic_name) .option("checkpointLocation", checkpoint_path) .trigger(Trigger.ProcessingTime("1 second")) .start() write_df.awaitTermination() } } Let me know, it is taking more than a minute for every run. The waiting time is keep on increasing as the data grows. Please let me know, any thing we need to configure to make it fast. I tried in
Re: Not able to overwrite cassandra table using Spark
You can try with this, it will work val finaldf = merchantdf.write. format("org.apache.spark.sql.cassandra") .mode(SaveMode.Overwrite) .option("confirm.truncate", true) .options(Map("table" -> "tablename", "keyspace" -> "keyspace")) .save() On Wed 27 Jun, 2018, 11:15 PM Abhijeet Kumar, wrote: > Hello Team, > > I’m creating a dataframe out of cassandra table using datastax spark > connector. After making some modification into the dataframe, I’m trying to > put that dataframe back to the Cassandra table by overwriting the old > content. For that the piece of code is: > > modifiedList.write.format("org.apache.spark.sql.cassandra") > .options(Map( "table" -> "list", "keyspace" -> "journaling", > "confirm.truncate" -> "true")) > .mode(Overwrite).save > > It’s not showing any error and seems like working fine, but when I’m > checking the Cassandra table back, there is no content inside it. > Everything is deleted. I’m really worried about this behaviour because this > may delete some useful content (I’m sure about overwriting the content and > fully understand the consequences). > > Thanks, > Abhijeet Kumar >