Re: Single executor processing all tasks in spark structured streaming kafka

2021-03-11 Thread Sachit Murarka
Hi Kapil, Thanks for suggestion. Yes, It worked. Regards Sachit On Tue, 9 Mar 2021, 00:19 Kapil Garg, wrote: > Hi Sachit, > What do you mean by "spark is running only 1 executor with 1 task" ? > Did you submit the spark application with multiple executors but only 1 is > being used and rest

Re: Single executor processing all tasks in spark structured streaming kafka

2021-03-08 Thread Kapil Garg
Hi Sachit, What do you mean by "spark is running only 1 executor with 1 task" ? Did you submit the spark application with multiple executors but only 1 is being used and rest are idle ? If that's the case, then it might happen due to spark.locality.wait setting which is by default set to 3s. This

Single executor processing all tasks in spark structured streaming kafka

2021-03-08 Thread Sachit Murarka
Hi All, I am using Spark 3.0.1 Structuring streaming with Pyspark. The problem is spark is running only 1 executor with 1 task. Following is the summary of what I am doing. Can anyone help on why my executor is 1 only? def process_events(event): fetch_actual_data() #many more steps def

Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-06 Thread Jungtaek Lim
In SS, checkpointing is now a part of running micro-batch and it's supported natively. (making clear, my library doesn't deal with the native behavior of checkpointing) In other words, it can't be customized like you have been doing with your database. You probably don't need to do it with SS,

Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-06 Thread KhajaAsmath Mohammed
Thanks Lim, this is really helpful. I have few questions. Our earlier approach used low level customer to read offsets from database and use those information to read using spark streaming in Dstreams. Save the offsets back once the process is finished. This way we never lost data. with your

Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread Jungtaek Lim
There're sections in SS programming guide which exactly answer these questions: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread Gabor Somogyi
In 3.0 the community just added it. On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, wrote: > Hi, > > We are trying to move our existing code from spark dstreams to structured > streaming for one of the old application which we built few years ago. > > Structured streaming job doesn’t have

Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread KhajaAsmath Mohammed
Hi, We are trying to move our existing code from spark dstreams to structured streaming for one of the old application which we built few years ago. Structured streaming job doesn’t have streaming tab in sparkui. Is there a way to monitor the job submitted by us in structured streaming ? Since

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
ok, thanks for confirming, I will do it this way. Regards Srini On Tue, Jun 9, 2020 at 11:31 PM Gerard Maas wrote: > Hi Srinivas, > > Reading from different brokers is possible but you need to connect to each > Kafka cluster separately. > Trying to mix connections to two different Kafka

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Gerard Maas
Hi Srinivas, Reading from different brokers is possible but you need to connect to each Kafka cluster separately. Trying to mix connections to two different Kafka clusters in one subscriber is not supported. (I'm sure that it would give all kind of weird errors) The "kafka.bootstrap.servers"

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
Thanks for the quick reply. This may work but I have like 5 topics to listen to right now, I am trying to keep all topics in an array in a properties file and trying to read all at once. This way it is dynamic and you have one code block like below and you may add or delete topics from the config

Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread German SM
Hello, I've never tried that, this doesn't work? val df_cluster1 = spark .read .format("kafka") .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port") .option("subscribe", "topic1") val df_cluster2 = spark .read .format("kafka") .option("kafka.bootstrap.servers",

[spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
Hello, In Structured Streaming, is it possible to have one spark application with one query to consume topics from multiple kafka clusters? I am trying to consume two topics each from different Kafka Cluster, but it gives one of the topics as an unknown topic and the job keeps running without

Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-22 Thread Ruijing Li
For some reason, after restarting the app and trying again, latest now works as expected. Not sure why it didn’t work before. On Tue, Apr 21, 2020 at 1:46 PM Ruijing Li wrote: > Yes, we did. But for some reason latest does not show them. The count is > always 0. > > On Sun, Apr 19, 2020 at 3:42

Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-21 Thread Ruijing Li
Yes, we did. But for some reason latest does not show them. The count is always 0. On Sun, Apr 19, 2020 at 3:42 PM Jungtaek Lim wrote: > Did you provide more records to topic "after" you started the query? > That's the only one I can imagine based on such information. > > On Fri, Apr 17, 2020

Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-19 Thread Jungtaek Lim
Did you provide more records to topic "after" you started the query? That's the only one I can imagine based on such information. On Fri, Apr 17, 2020 at 9:13 AM Ruijing Li wrote: > Hi all, > > Apologies if this has been asked before, but I could not find the answer > to this question. We have

Using startingOffsets latest - no data from structured streaming kafka query

2020-04-16 Thread Ruijing Li
Hi all, Apologies if this has been asked before, but I could not find the answer to this question. We have a structured streaming job, but for some reason, if we use startingOffsets = latest with foreachbatch mode, it doesn’t produce any data. Rather, in logs I see it repeats the message “

Re: structured streaming Kafka consumer group.id override

2020-03-19 Thread lec ssmi
The last offset is stored in file system you specified , how does it expire? I don't understand. I haven't met that condition. Srinivas V 于2020年3月19日周四 下午10:18写道: > 1. How would a prod admin user/other engineers understand which process > is this random groupid which is consuming a specific

Re: structured streaming Kafka consumer group.id override

2020-03-19 Thread Srinivas V
1. How would a prod admin user/other engineers understand which process is this random groupid which is consuming a specific topic? why is it designed this way? 2. I don't see the groupid changing all the time. It is repeating on restarts. Not able to understand when and how it changes. I know

Re: structured streaming Kafka consumer group.id override

2020-03-18 Thread lec ssmi
1.Maybe we can't use customized group id in structured streaming. 2.When restarting from failure or killing , the group id changes, but the starting offset will be the last one you consumed last time . Srinivas V 于2020年3月19日周四 下午12:36写道: > Hello, > 1. My Kafka consumer name is randomly being

structured streaming Kafka consumer group.id override

2020-03-18 Thread Srinivas V
Hello, 1. My Kafka consumer name is randomly being generated by spark structured streaming. Can I override this? 2. When testing in development, when I stop my streaming job for Kafka consumer job for couple of days and try to start back again, the job keeps failing for missing offsets as the

Re: Structured Streaming Kafka change maxOffsetsPerTrigger won't apply

2019-11-20 Thread Gabor Somogyi
Hi Roland, Not much shared apart from it's not working. Latest partition offset is used when the size of a TopicPartition is negative. This can be found out by checking the following log entry in the logs: logDebug(s"rateLimit $tp size is $size") If you've double checked and still think it's an

Structured Streaming Kafka change maxOffsetsPerTrigger won't apply

2019-11-20 Thread Roland Johann
Hi All, changing maxOffsetsPerTrigger and restarting the job won’t apply to the batch size. This is somehow bad as we currently use a trigger duration of 5minutes which consumes only 100k messages with an offset lag in the billions. Decreasing trigger duration affects also micro batch size -

how to specify which partition each record send on spark structured streaming kafka sink?

2019-08-13 Thread zenglong chen
Key option is not work!

How to add spark structured streaming kafka source receiver

2019-08-09 Thread zenglong chen
I have set the " maxOffsetsPerTrigger",but it still receive one partition per trigger on micro-batch mode.So where to set receiving on 10 partitions parallel like what is Spark Streaming doing?

Re: Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-14 Thread Suket Arora
// Continuous trigger with one-second checkpointing intervaldf.writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .start() On Tue, 14 May 2019 at 22:10, suket arora wrote: > Hey Austin, > > If you truly want to process as a stream, use continuous streaming in >

Re: Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-13 Thread Gabor Somogyi
> Where exactly would I see the start/end offset values per batch, is that in the spark logs? Yes, it's in the Spark logs. Please see this: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively On Mon, May 13, 2019 at 10:53 AM Austin

Re: Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-13 Thread Austin Weaver
Hi Akshay, Thanks very much for the reply! 1) The topics have 12 partitions (both input and output) 2-3) I read that "trigger" is used for microbatching, but it you would like the stream to truly process as a "stream" as quickly as possible, then to leave this opted out? In any case, I am using

Re: Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-08 Thread Akshay Bhardwaj
Hi Austin, A few questions: 1. What is the partition of the kafka topic that used for input and output data? 2. In the write stream, I will recommend to use "trigger" with a defined interval, if you prefer micro-batching strategy, 3. along with defining "maxOffsetsPerTrigger" in

Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-07 Thread Austin Weaver
Hey Spark Experts, After listening to some of you, and the presentations at Spark Summit in SF, I am transitioning from d-streams to structured streaming however I am seeing some weird results. My use case is as follows: I am reading in a stream from a kafka topic, transforming a message, and

Re: Handle Null Columns in Spark Structured Streaming Kafka

2019-04-30 Thread SNEHASISH DUTTA
Hi NA function will replace null with some default value and not all my columns are of type string, so for some other data types (long/int etc) I have to provide some default value But ideally those values should be null Actually this null column drop is happening in this step df.selectExpr(

Re: Handle Null Columns in Spark Structured Streaming Kafka

2019-04-29 Thread Jason Nerothin
See also here: https://stackoverflow.com/questions/44671597/how-to-replace-null-values-with-a-specific-value-in-dataframe-using-spark-in-jav On Mon, Apr 29, 2019 at 5:27 PM Jason Nerothin wrote: > Spark SQL has had an na.fill function on it since at least 2.1. Would that > work for you? > > >

Re: Handle Null Columns in Spark Structured Streaming Kafka

2019-04-29 Thread Jason Nerothin
Spark SQL has had an na.fill function on it since at least 2.1. Would that work for you? https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameNaFunctions.html On Mon, Apr 29, 2019 at 4:57 PM Shixiong(Ryan) Zhu wrote: > Hey Snehasish, > > Do you have a reproducer for this

Handle Null Columns in Spark Structured Streaming Kafka

2019-04-24 Thread SNEHASISH DUTTA
Hi, While writing to kafka using spark structured streaming , if all the values in certain column are Null it gets dropped Is there any way to override this , other than using na.fill functions Regards, Snehasish

Re: Spark 2.4 Structured Streaming Kafka assign API polling same offsets

2019-03-01 Thread Kristopher Kane
I figured out why. We are not persisting the data at the end of .load(). Thus, every operation like count() is going back to Kafka for the data again. On Fri, Mar 1, 2019 at 10:10 AM Kristopher Kane wrote: > > We are using the assign API to do batch work with Spark and Kafka. > What I'm seeing

Spark 2.4 Structured Streaming Kafka assign API polling same offsets

2019-03-01 Thread Kristopher Kane
We are using the assign API to do batch work with Spark and Kafka. What I'm seeing is the Spark executor work happening in the back ground and constantly polling the same data over and over until the main thread commits the offsets. Is the below a blocking operation? Dataset df =

Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-06-06 Thread amihay gonen
If you are using kafka direct connect api it might be committing offset back to kafka itself בתאריך יום ה׳, 7 ביוני 2018, 4:10, מאת licl ‏: > I met the same issue and I have try to delete the checkpoint dir before the > job , > > But spark seems can read the correct offset even though after the

Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-06-06 Thread licl
I met the same issue and I have try to delete the checkpoint dir before the job , But spark seems can read the correct offset even though after the checkpoint dir is deleted , I don't know how spark do this without checkpoint's metadata. -- Sent from:

Re: [structured-streaming][kafka] Will the Kafka readstream timeout after connections.max.idle.ms 540000 ms ?

2018-05-16 Thread Shixiong(Ryan) Zhu
The streaming query should keep polling data from Kafka. When the query was stopped, did you see any exception? Best Regards, Shixiong Zhu Databricks Inc. shixi...@databricks.com databricks.com [image: http://databricks.com]

[structured-streaming][kafka] Will the Kafka readstream timeout after connections.max.idle.ms 540000 ms ?

2018-05-15 Thread karthikjay
Hi all, We are running into a scenario where the structured streaming job is exiting after a while specifically when the Kafka topic is not getting any data. >From the job logs, I see this connections.max.idle.ms = 54. Does that mean the spark readstream will close when it does not get data

Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-11 Thread Michael Armbrust
> > > https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html > > The documentation says the format of "timestamp" column is Long type, > but the actual format is timestamp. > > > The followings are my code and result to check schema. > >

Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-10 Thread Yuta Morisawa
The problem is solved. The actual schema of Kafka message is different from documentation. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html The documentation says the format of "timestamp" column is Long type, but the actual format is

Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-09 Thread Yuta Morisawa
? These are websites I refer to. https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html And this is my code. val df = spark .readStream .format("kafka")

Re: [Structured Streaming][Kafka] For a Kafka topic with 3 partitions, how does the parallelism work ?

2018-04-21 Thread Raghavendra Pandey
Yes as long as there are 3 cores available on your local machine. On Fri, Apr 20, 2018 at 10:56 AM karthikjay wrote: > I have the following code to read data from Kafka topic using the > structured > streaming. The topic has 3 partitions: > > val spark = SparkSession >

Re: [Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes

2018-04-20 Thread Bowden, Chris
The primary role of a sink is storing output tuples. Consider groupByKey and map/flatMapGroupsWithState instead. -Chris From: karthikjay <aswin8...@gmail.com> Sent: Friday, April 20, 2018 4:49:49 PM To: user@spark.apache.org Subject: [Structured Streaming]

[Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes

2018-04-20 Thread karthikjay
Any help appreciated. please find the question in the link: https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

[Structured Streaming][Kafka] For a Kafka topic with 3 partitions, how does the parallelism work ?

2018-04-20 Thread karthikjay
I have the following code to read data from Kafka topic using the structured streaming. The topic has 3 partitions: val spark = SparkSession .builder .appName("TestPartition") .master("local[*]") .getOrCreate() import spark.implicits._ val dataFrame = spark

[Structured Streaming] Kafka Sink in Spark 2.3

2018-03-29 Thread Lalwani, Jayesh
Hi I have a custom streaming sink that internally uses org.apache.spark.sql.kafka010.KafkaSink. This was working in 2.2.. When I upgraded to 2.3, I get this exception. Does spark-sql-Kafka010 work on Spark 2.3? 84705281f4b]] DEBUG com.capitalone.sdp.spark.source.SdpSink - Writing batch to

Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-03-22 Thread M Singh
Hi: I am working on a realtime application using spark structured streaming (v 2.2.1). The application reads data from kafka and if there is a failure, I would like to ignore the checkpoint.  Is there any configuration to just read from last kafka offset after a failure and ignore any offset

Re: Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread Tathagata Das
Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint directory that you provide. And when you start the query again with the same directory it will just pick up where it left off.

Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread M Singh
Hi: I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the last few days, after running the application for 30-60 minutes get exception from Kafka Consumer included below. The structured streaming application is processing 1 minute worth of data from kafka topic. So I've tried

Re: Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

2018-01-04 Thread Shixiong(Ryan) Zhu
The root cause is probably that HDFSMetadataLog ignores exceptions thrown by "output.close". I think this should be fixed by this line in Spark 2.2.1 and 3.0.0: https://github.com/apache/spark/commit/6edfff055caea81dc3a98a6b4081313a0c0b0729#diff-aaeb546880508bb771df502318c40a99L126 Could you try

Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

2018-01-04 Thread William Briggs
I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The job sources data from a Kafka topic, performs a variety of filters and transformations, and sinks data back into a different Kafka topic. Once per day, we stop the query in order to merge the namenode edit logs with the

Re: Infer JSON schema in structured streaming Kafka.

2017-12-11 Thread Burak Yavuz
In Spark 2.2, you can read from Kafka in batch mode, and then use the json reader to infer schema: val df = spark.read.format("kafka")... .select($"value.cast("string")) val json = spark.read.json(df) val schema = json.schema While the above should be slow (since you're reading almost all data

Re: Infer JSON schema in structured streaming Kafka.

2017-12-11 Thread satyajit vegesna
Hi Burak, Thank you , for the inputs, would definitely try the options. The reason we don't have an unified schema is because we are trying to consume data from different topics that contains data from different tables from a DB, and so each table has different columns. Regards, Satyajit. On

Re: Infer JSON schema in structured streaming Kafka.

2017-12-11 Thread Jacek Laskowski
Hi, What about a custom streaming Sink that would stop the query after addBatch has been called? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

Re: Infer JSON schema in structured streaming Kafka.

2017-12-11 Thread satyajit vegesna
Hi Jacek, For now , i am using Thread.sleep() on driver, to make sure my streaming query receives some data and and stop it, before the control reaches querying memory table. Let me know if there is any better way of handling it. Regards, Satyajit. On Sun, Dec 10, 2017 at 10:43 PM, satyajit

Re: Infer JSON schema in structured streaming Kafka.

2017-12-10 Thread satyajit vegesna
Hi Jacek, Thank you for responding back, i have tried memory sink, and below is what i did val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name")) .withColumn("operation",

Re: Infer JSON schema in structured streaming Kafka.

2017-12-10 Thread Jacek Laskowski
Hi, What about memory sink? That could work. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On

Infer JSON schema in structured streaming Kafka.

2017-12-10 Thread satyajit vegesna
Hi All, I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps, a. readStream from

Structured Streaming + Kafka 0.10. connectors + valueDecoder and messageHandler with python

2017-12-09 Thread salemi
Hi All, we are currently using direct streams to get the data from a kafka topic as followed KafkaUtils.createDirectStream(ssc=self.streaming_context, topics=topics, kafkaParams=kafka_params,

Spark Structured Streaming + Kafka

2017-11-14 Thread Agostino Calamita
Hi, I have a problem with Structured Streaming and Kafka. I have 2 brokers and a topic with 8 partitions and replication factor 2. This is my driver program: public static void main(String[] args) { SparkSession spark = SparkSession .builder()

Re: Structured Streaming + Kafka Integration unable to read new messages after sometimes

2017-08-11 Thread Jacek Laskowski
e kafka topics. > > can anybody help me in resolving the issue? > > > > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Structured-Streaming-Kafka- > Integration-unable-to-read-new-messages-after-sometimes-tp290

Structured Streaming + Kafka Integration unable to read new messages after sometimes

2017-08-11 Thread NikhilP
the issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-Kafka-Integration-unable-to-read-new-messages-after-sometimes-tp29055.html Sent from the Apache Spark User List mailing list archive at Nabble.com

Re: Structured Streaming - Kafka

2017-03-07 Thread Bowden, Chris
https://issues.apache.org/jira/browse/SPARK-19853, pr by eow From: Shixiong(Ryan) Zhu <shixi...@databricks.com> Sent: Tuesday, March 7, 2017 2:04:45 PM To: Bowden, Chris Cc: user; Gudenkauf, Jack Subject: Re: Structured Streaming - Kafka Good catch. Cou

Re: Structured Streaming - Kafka

2017-03-07 Thread Shixiong(Ryan) Zhu
Good catch. Could you create a ticket? You can also submit a PR to fix it if you have time :) On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris wrote: > Potential bug when using startingOffsets = SpecificOffsets with Kafka > topics containing uppercase characters? > >

Structured Streaming - Kafka

2017-03-07 Thread Bowden, Chris
Potential bug when using startingOffsets = SpecificOffsets with Kafka topics containing uppercase characters? KafkaSourceProvider#L80/86: val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { case Some("latest") => LatestOffsets