Spark Structured Streaming: “earliest” as “startingOffsets” is not working

2020-06-26 Thread Something Something
My Spark Structured Streaming job works fine when I set "startingOffsets" to "latest". When I simply change it to "earliest" & specify a new "check point directory", the job doesn't work. The states don't get timed out after 10 minutes. While debugging I noticed that my 'state' logic is indeed get

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
d asked this question before. No one responded. By the way, what’s > your actual name “Something something” if you don’t mind me asking? > > On Tue, Jun 9, 2020 at 12:27 AM Something Something < > mailinglist...@gmail.com> wrote: > >> What is scary is

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
What is scary is this interface is marked as "experimental" @Experimental @InterfaceStability.Evolving public interface MapGroupsWithStateFunction extends Serializable { R call(K key, Iterator values, GroupState state) throws Exception; } On Mon, Jun 8, 2020 at 11:54 AM Something

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
rds > .groupByKey(v => ...) > .mapGroupsWithState(timeoutConf = > GroupStateTimeout.ProcessingTimeTimeout)(func > = mappingFunc) > > val query = wordCounts.writeStream > .outputMode(OutputMode.Update) > > > On Mon, Jun 8, 2020 at 11:14 AM Someth

Re: Using Spark Accumulators with Structured Streaming

2020-06-07 Thread Something Something
Exception { > myAccInTask.add(1); > <...> > state.update(newState); > return new LeadingCharCount(key, newState); > } > } > ``` > > -- > Cheers, > -z > > On Tue, 2 Jun 2020 10:28:36 +0800 > Z

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
d in the call method of StateUpdateTask. Like > when state times out or when the data is pushed to next Kafka topic etc. > > On Fri, May 29, 2020 at 11:55 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Thanks! I will take a look at the link. Just one q

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
ator, GroupState state) { > } > } > > On Thu, May 28, 2020 at 10:59 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I am assuming StateUpdateTask is your application specific class. Does it >> have 'updateState' method or something? I

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
iver stdout. > > > > > > -- > > > Cheers, > > > -z > > > > > > On Thu, 28 May 2020 17:12:48 +0530 > > > Srinivas V wrote: > > > > > > > yes, I am using stateful structured streaming. Yes similar to what > you > >

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Something Something
; Encoders.bean(ModelUpdate.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > StateUpdateTask contains the update method. > > On Thu, May 28, 2020 at 4:41 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Yes, t

Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Something Something
> Create accumulators like this: > AccumulatorV2 accumulator = sparkContext.longAccumulator(name); > > > On Tue, May 26, 2020 at 8:42 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Hmm... how would they go to Graphana if they are not getting computed in >> your cod

Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-26 Thread Something Something
k data source > itself. (Kafka connector in this case) > > > https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations > > Hope this helps. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > > On Tue, May 26, 2020 at 6:42

Re: Using Spark Accumulators with Structured Streaming

2020-05-26 Thread Something Something
s V wrote: > Hello, > Even for me it comes as 0 when I print in OnQueryProgress. I use > LongAccumulator as well. Yes, it prints on my local but not on cluster. > But one consolation is that when I send metrics to Graphana, the values > are coming there. > > On Tue, Ma

RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Something Something
I keep getting this error message: *The message is 1169350 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.* As indicated in other posts, I am trying to set the “max.request.size” configuration in the Producer as f

Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Something Something
ml#org.apache.spark.util.AccumulatorV2 > [2] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator > [3] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator > > ______

Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread Something Something
ctured Streaming? I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something < mailinglist...@gmail.com> wrote: > In my structured streaming job I am updating Spark Accumulators in the > updateAcross

Using Spark Accumulators with Structured Streaming

2020-05-14 Thread Something Something
In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The accumula

Re: No. of active states?

2020-05-07 Thread Something Something
b, it's being > provided via StreamingQueryListener. > > > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries > > Hope this helps. > > On Fri, May 8, 2020 at 3:26 AM Something Something < > mailinglist...@gmail

Dynamically changing maxOffsetsPerTrigger

2020-05-07 Thread Something Something
Is there a way to dynamically modify value of 'maxOffsetsPerTrigger' while a Stateful Structured Streaming job is running? We are thinking of auto-scaling our Spark cluster but if we don't modify the value of 'maxOffsetsPerTrigger' dynamically would adding more VMs to the cluster help? I don't thi

No. of active states?

2020-05-07 Thread Something Something
Is there a way to get the total no. of active states in memory at any given point in a Stateful Spark Structured Streaming job? We are thinking of using this metric for 'Auto Scaling' our Spark cluster.

Spark not able to read from an Embedded Kafka Topic

2020-03-06 Thread Something Something
I am trying to write an integration test using Embedded Kafka but I keep getting NullPointerException. My test case is very simple. It has following steps: 1. Read a JSON file & write messages to an inputTopic. 2. Perform a 'readStream' operation. 3. Do a 'select' on the Stream. This thro

Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-05 Thread Something Something
r details here - > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState > > On Wed, Mar 4, 2020 at 2:51 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I've set the timeout duration to "2 minutes" as fo

Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Something Something
I've set the timeout duration to "2 minutes" as follows: def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject], oldState: GroupState[MyState]): OutputRow = { println(" Inside updateAcrossEvents with : " + tuple3._1 + ",

Re: Stateful Spark Streaming: Required attribute 'value' not found

2020-03-04 Thread Something Something
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe it will help someone. On Tue, Mar 3, 2020 at 6:02 PM Something Something wrote: > In a Stateful Spark Streaming application I am writing the 'OutputRow' in > the 'updateAc

Stateful Spark Streaming: Required attribute 'value' not found

2020-03-03 Thread Something Something
In a Stateful Spark Streaming application I am writing the 'OutputRow' in the 'updateAcrossEvents' but I keep getting this error (*Required attribute 'value' not found*) while it's trying to write to Kafka. I know from the documentation that 'value' attribute needs to be set but how do I do that in

Example of Stateful Spark Structured Streaming with Kafka

2020-03-03 Thread Something Something
There are lots of examples on 'Stateful Structured Streaming' in 'The Definitive Guide' book BUT all of them read JSON from a 'path'. That's working for me. Now I need to read from Kafka. I Googled but I couldn't find any example. I am struggling to Map the 'Value' of the Kafka message to my JSON

Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
`updateAcrossEvents`. .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* On Mon, Mar 2, 2020 at 5:12 PM lec ssmi wrote: > maybe you can combine the fields you want to use into one field > > Something Something 于2020年3月3日周二 上午6:37写道: > >> I am writing a

Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create *Groups based on more than one column in the Input Row*. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I

Aggregating values by a key field in Spark Streaming

2020-02-28 Thread Something Something
Here's my use case: Messages are coming into a Kafka Topic for different 'Events'. Each event has a unique Event Id. I need to aggregate counts for each Event AFTER the event is completed. For now, we are thinking we can assume an event is completed if there are no more messages coming in for a per

Spark Streaming: Aggregating values across batches

2020-02-27 Thread Something Something
We've a Spark Streaming job that calculates some values in each batch. What we need to do now is aggregate values across ALL batches. What is the best strategy to do this in Spark Streaming. Should we use 'Spark Accumulators' for this?

Storing object in spark streaming

2015-10-12 Thread Something Something
In my custom receiver for Spark Streaming I've code such as this: messages.toArray().foreach(msg => { val m = msg.asInstanceOf[Message] * store(m.getBody)* }) Instead of 'body' which is of type 'String', I would rather pass the en

Creating Custom Receiver for Spark Streaming

2015-10-12 Thread Something Something
Is it safe to assume that Spark will always create a single instance of Custom Receiver? Or would it create multiple instances on each node in a cluster? Wondering if I need to worry about receiving the same message on different nodes etc. Please help. Thanks.

Re: JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
I am not running locally. The Spark master is: "spark://:7077" On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das wrote: > What is the Spark master that you are using. Use local[4], not local > if you are running locally. > > On Mon, Nov 10, 2014 at 3:01 PM, Something Som

JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
I am embarrassed to admit but I can't get a basic 'word count' to work under Kafka/Spark streaming. My code looks like this. I don't see any word counts in console output. Also, don't see any output in UI. Needless to say, I am newbie in both 'Spark' as well as 'Kafka'. Please help. Thanks.

Re: Kafka Consumer in Spark Streaming

2014-11-05 Thread Something Something
System.out.println(status); > return status; > } > } > ); > > > statuses​.print() > ​;​ > ​ > > ​​ > > Or you could use foreachRDD instead of map() if your intention is just >

Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
> more number of threads. > It may be due to topic name mismatch also. > > sparkConf.setMaster(“local[1]"); > > Regards, > Rahul > > From: Something Something > Date: Wednesday, November 5, 2014 at 12:23 PM > To: "Shao, Saisai" > Cc:

Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
ng on a standalone mode, the log is under > /work/ directory. I’m not sure for yarn or mesos, you can check > the document of Spark to see the details. > > > > Thanks > > Jerry > > > > *From:* Something Something [mailto:mailinglist...@gmail.com] > *Sent:* Wednesd

Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
3. Did you see any exceptions when running the app, this will help > to define the problem. > > > > Thanks > > Jerry > > > > *From:* Something Something [mailto:mailinglist...@gmail.com] > *Sent:* Wednesday, November 05, 2014 1:57 PM > *To:* user@spark.a

Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration