Spark Structured Streaming: “earliest” as “startingOffsets” is not working

2020-06-26 Thread Something Something
My Spark Structured Streaming job works fine when I set "startingOffsets" to "latest". When I simply change it to "earliest" & specify a new "check point directory", the job doesn't work. The states don't get timed out after 10 minutes. While debugging I noticed that my 'state' logic is indeed

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
No one responded. By the way, what’s > your actual name “Something something” if you don’t mind me asking? > > On Tue, Jun 9, 2020 at 12:27 AM Something Something < > mailinglist...@gmail.com> wrote: > >> What is scary is this interface is marked as &

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
What is scary is this interface is marked as "experimental" @Experimental @InterfaceStability.Evolving public interface MapGroupsWithStateFunction extends Serializable { R call(K key, Iterator values, GroupState state) throws Exception; } On Mon, Jun 8, 2020 at 11:54 AM Something

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Something Something
.groupByKey(v => ...) > .mapGroupsWithState(timeoutConf = > GroupStateTimeout.ProcessingTimeTimeout)(func > = mappingFunc) > > val query = wordCounts.writeStream > .outputMode(OutputMode.Update) > > > On Mon, Jun 8, 2020 at 11:14 AM Something Somet

Re: Using Spark Accumulators with Structured Streaming

2020-06-07 Thread Something Something
{ > myAccInTask.add(1); > <...> > state.update(newState); > return new LeadingCharCount(key, newState); > } > } > ``` > > -- > Cheers, > -z > > On Tue, 2 Jun 2020 10:28:36 +0800 > ZHANG Wei

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
pdateTask. Like > when state times out or when the data is pushed to next Kafka topic etc. > > On Fri, May 29, 2020 at 11:55 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Thanks! I will take a look at the link. Just one question, you seem to be &g

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
e state) { > } > } > > On Thu, May 28, 2020 at 10:59 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I am assuming StateUpdateTask is your application specific class. Does it >> have 'updateState' method or something? I googled but couldn't fin

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Something Something
t; > > > -- > > > Cheers, > > > -z > > > > > > On Thu, 28 May 2020 17:12:48 +0530 > > > Srinivas V wrote: > > > > > > > yes, I am using stateful structured streaming. Yes similar to what > you > > > do. > > > &

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Something Something
Encoders.bean(ModelUpdate.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > StateUpdateTask contains the update method. > > On Thu, May 28, 2020 at 4:41 AM Something Something < > mailinglist...@gmail.com> wrote: > >> Yes, that's exactly h

Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Something Something
mulatorV2 accumulator = sparkContext.longAccumulator(name); > > > On Tue, May 26, 2020 at 8:42 PM Something Something < > mailinglist...@gmail.com> wrote: > >> Hmm... how would they go to Graphana if they are not getting computed in >> your code? I am talking about the Application

Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-26 Thread Something Something
k data source > itself. (Kafka connector in this case) > > > https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations > > Hope this helps. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > > On Tue, May 26, 2020 at 6:42

Re: Using Spark Accumulators with Structured Streaming

2020-05-26 Thread Something Something
: > Hello, > Even for me it comes as 0 when I print in OnQueryProgress. I use > LongAccumulator as well. Yes, it prints on my local but not on cluster. > But one consolation is that when I send metrics to Graphana, the values > are coming there. > > On Tue, May 26, 2020

RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Something Something
I keep getting this error message: *The message is 1169350 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.* As indicated in other posts, I am trying to set the “max.request.size” configuration in the Producer as

Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Something Something
g.apache.spark.util.AccumulatorV2 > [2] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator > [3] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator > > _______

Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread Something Something
will keep looking for alternate approaches but any help would be greatly appreciated. Thanks. On Thu, May 14, 2020 at 2:36 PM Something Something < mailinglist...@gmail.com> wrote: > In my structured streaming job I am updating Spark Accumulators in the > updateAcrossEvents method but th

Using Spark Accumulators with Structured Streaming

2020-05-14 Thread Something Something
In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code: .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( updateAcrossEvents ) The

Re: No. of active states?

2020-05-07 Thread Something Something
's being > provided via StreamingQueryListener. > > > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries > > Hope this helps. > > On Fri, May 8, 2020 at 3:26 AM Something Something < > mailinglist...@gmail.com> w

Dynamically changing maxOffsetsPerTrigger

2020-05-07 Thread Something Something
Is there a way to dynamically modify value of 'maxOffsetsPerTrigger' while a Stateful Structured Streaming job is running? We are thinking of auto-scaling our Spark cluster but if we don't modify the value of 'maxOffsetsPerTrigger' dynamically would adding more VMs to the cluster help? I don't

No. of active states?

2020-05-07 Thread Something Something
Is there a way to get the total no. of active states in memory at any given point in a Stateful Spark Structured Streaming job? We are thinking of using this metric for 'Auto Scaling' our Spark cluster.

Spark not able to read from an Embedded Kafka Topic

2020-03-06 Thread Something Something
I am trying to write an integration test using Embedded Kafka but I keep getting NullPointerException. My test case is very simple. It has following steps: 1. Read a JSON file & write messages to an inputTopic. 2. Perform a 'readStream' operation. 3. Do a 'select' on the Stream. This

Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-05 Thread Something Something
- > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState > > On Wed, Mar 4, 2020 at 2:51 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I've set the timeout duration to "2 minutes" as follows: >> >

Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Something Something
I've set the timeout duration to "2 minutes" as follows: def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject], oldState: GroupState[MyState]): OutputRow = { println(" Inside updateAcrossEvents with : " + tuple3._1 + ",

Re: Stateful Spark Streaming: Required attribute 'value' not found

2020-03-04 Thread Something Something
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe it will help someone. On Tue, Mar 3, 2020 at 6:02 PM Something Something wrote: > In a Stateful Spark Streaming application I am writing the 'OutputRow' in > the 'updateAcrossEvents' but I keep getting this

Stateful Spark Streaming: Required attribute 'value' not found

2020-03-03 Thread Something Something
In a Stateful Spark Streaming application I am writing the 'OutputRow' in the 'updateAcrossEvents' but I keep getting this error (*Required attribute 'value' not found*) while it's trying to write to Kafka. I know from the documentation that 'value' attribute needs to be set but how do I do that

Example of Stateful Spark Structured Streaming with Kafka

2020-03-03 Thread Something Something
There are lots of examples on 'Stateful Structured Streaming' in 'The Definitive Guide' book BUT all of them read JSON from a 'path'. That's working for me. Now I need to read from Kafka. I Googled but I couldn't find any example. I am struggling to Map the 'Value' of the Kafka message to my

Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
of `updateAcrossEvents`. .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* On Mon, Mar 2, 2020 at 5:12 PM lec ssmi wrote: > maybe you can combine the fields you want to use into one field > > Something Something 于2020年3月3日周二 上午6:37写道: > >> I am writing a

Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create *Groups based on more than one column in the Input Row*. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I

Aggregating values by a key field in Spark Streaming

2020-02-28 Thread Something Something
Here's my use case: Messages are coming into a Kafka Topic for different 'Events'. Each event has a unique Event Id. I need to aggregate counts for each Event AFTER the event is completed. For now, we are thinking we can assume an event is completed if there are no more messages coming in for a

Spark Streaming: Aggregating values across batches

2020-02-27 Thread Something Something
We've a Spark Streaming job that calculates some values in each batch. What we need to do now is aggregate values across ALL batches. What is the best strategy to do this in Spark Streaming. Should we use 'Spark Accumulators' for this?

Creating Custom Receiver for Spark Streaming

2015-10-12 Thread Something Something
Is it safe to assume that Spark will always create a single instance of Custom Receiver? Or would it create multiple instances on each node in a cluster? Wondering if I need to worry about receiving the same message on different nodes etc. Please help. Thanks.

Storing object in spark streaming

2015-10-12 Thread Something Something
In my custom receiver for Spark Streaming I've code such as this: messages.toArray().foreach(msg => { val m = msg.asInstanceOf[Message] * store(m.getBody)* }) Instead of 'body' which is of type 'String', I would rather pass the

JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
I am embarrassed to admit but I can't get a basic 'word count' to work under Kafka/Spark streaming. My code looks like this. I don't see any word counts in console output. Also, don't see any output in UI. Needless to say, I am newbie in both 'Spark' as well as 'Kafka'. Please help. Thanks.

Re: JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
, Something Something mailinglist...@gmail.com wrote: I am embarrassed to admit but I can't get a basic 'word count' to work under Kafka/Spark streaming. My code looks like this. I don't see any word counts in console output. Also, don't see any output in UI. Needless to say, I am newbie

Re: Kafka Consumer in Spark Streaming

2014-11-05 Thread Something Something
; } } ); statuses​.print() ​;​ ​ ​​ Or you could use foreachRDD instead of map() if your intention is just printing.​​ Thanks Best Regards On Wed, Nov 5, 2014 at 12:35 PM, Something Something mailinglist...@gmail.com wrote: It's not local. My spark url is something like

Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
I've following code in my program. I don't get any error, but it's not consuming the messages either. Shouldn't the following code print the line in the 'call' method? What am I missing? Please help. Thanks. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new

Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
when running the app, this will help to define the problem. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 1:57 PM *To:* user@spark.apache.org *Subject:* Kafka Consumer in Spark Streaming I've following code in my

Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
: If you’re running on a standalone mode, the log is under SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check the document of Spark to see the details. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 2:28

Re: Kafka Consumer in Spark Streaming

2014-11-04 Thread Something Something
] or more number of threads. It may be due to topic name mismatch also. sparkConf.setMaster(“local[1]); Regards, Rahul From: Something Something mailinglist...@gmail.com Date: Wednesday, November 5, 2014 at 12:23 PM To: Shao, Saisai saisai.s...@intel.com Cc: user@spark.apache.org user