My Spark Structured Streaming job works fine when I set "startingOffsets"
to "latest". When I simply change it to "earliest" & specify a new "check
point directory", the job doesn't work. The states don't get timed out
after 10 minutes.
While debugging I noticed that my 'state' logic is indeed get
d asked this question before. No one responded. By the way, what’s
> your actual name “Something something” if you don’t mind me asking?
>
> On Tue, Jun 9, 2020 at 12:27 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> What is scary is
What is scary is this interface is marked as "experimental"
@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction extends Serializable {
R call(K key, Iterator values, GroupState state) throws Exception;
}
On Mon, Jun 8, 2020 at 11:54 AM Something
rds
> .groupByKey(v => ...)
> .mapGroupsWithState(timeoutConf =
> GroupStateTimeout.ProcessingTimeTimeout)(func
> = mappingFunc)
>
> val query = wordCounts.writeStream
> .outputMode(OutputMode.Update)
>
>
> On Mon, Jun 8, 2020 at 11:14 AM Someth
Exception {
> myAccInTask.add(1);
> <...>
> state.update(newState);
> return new LeadingCharCount(key, newState);
> }
> }
> ```
>
> --
> Cheers,
> -z
>
> On Tue, 2 Jun 2020 10:28:36 +0800
> Z
d in the call method of StateUpdateTask. Like
> when state times out or when the data is pushed to next Kafka topic etc.
>
> On Fri, May 29, 2020 at 11:55 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Thanks! I will take a look at the link. Just one q
ator, GroupState state) {
> }
> }
>
> On Thu, May 28, 2020 at 10:59 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am assuming StateUpdateTask is your application specific class. Does it
>> have 'updateState' method or something? I
iver stdout.
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 17:12:48 +0530
> > > Srinivas V wrote:
> > >
> > > > yes, I am using stateful structured streaming. Yes similar to what
> you
> >
; Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Yes, t
> Create accumulators like this:
> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed in
>> your cod
k data source
> itself. (Kafka connector in this case)
>
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Tue, May 26, 2020 at 6:42
s V wrote:
> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> But one consolation is that when I send metrics to Graphana, the values
> are coming there.
>
> On Tue, Ma
I keep getting this error message:
*The message is 1169350 bytes when serialized which is larger than the
maximum request size you have configured with the max.request.size
configuration.*
As indicated in other posts, I am trying to set the “max.request.size”
configuration in the Producer as f
ml#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> ______
ctured Streaming?
I will keep looking for alternate approaches but any help would be
greatly appreciated. Thanks.
On Thu, May 14, 2020 at 2:36 PM Something Something <
mailinglist...@gmail.com> wrote:
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcross
In my structured streaming job I am updating Spark Accumulators in the
updateAcrossEvents method but they are always 0 when I try to print them in
my StreamingListener. Here's the code:
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
The accumula
b, it's being
> provided via StreamingQueryListener.
>
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>
> Hope this helps.
>
> On Fri, May 8, 2020 at 3:26 AM Something Something <
> mailinglist...@gmail
Is there a way to dynamically modify value of 'maxOffsetsPerTrigger' while
a Stateful Structured Streaming job is running?
We are thinking of auto-scaling our Spark cluster but if we don't modify
the value of 'maxOffsetsPerTrigger' dynamically would adding more VMs to
the cluster help? I don't thi
Is there a way to get the total no. of active states in memory at any given
point in a Stateful Spark Structured Streaming job? We are thinking of
using this metric for 'Auto Scaling' our Spark cluster.
I am trying to write an integration test using Embedded Kafka but I keep
getting NullPointerException. My test case is very simple. It has following
steps:
1. Read a JSON file & write messages to an inputTopic.
2. Perform a 'readStream' operation.
3. Do a 'select' on the Stream. This thro
r details here -
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState
>
> On Wed, Mar 4, 2020 at 2:51 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I've set the timeout duration to "2 minutes" as fo
I've set the timeout duration to "2 minutes" as follows:
def updateAcrossEvents (tuple3: Tuple3[String, String, String],
inputs: Iterator[R00tJsonObject],
oldState: GroupState[MyState]): OutputRow = {
println(" Inside updateAcrossEvents with : " + tuple3._1 + ",
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe
it will help someone.
On Tue, Mar 3, 2020 at 6:02 PM Something Something
wrote:
> In a Stateful Spark Streaming application I am writing the 'OutputRow' in
> the 'updateAc
In a Stateful Spark Streaming application I am writing the 'OutputRow' in
the 'updateAcrossEvents' but I keep getting this error (*Required attribute
'value' not found*) while it's trying to write to Kafka. I know from the
documentation that 'value' attribute needs to be set but how do I do that
in
There are lots of examples on 'Stateful Structured Streaming' in 'The
Definitive Guide' book BUT all of them read JSON from a 'path'. That's
working for me.
Now I need to read from Kafka.
I Googled but I couldn't find any example. I am struggling to Map the
'Value' of the Kafka message to my JSON
`updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
On Mon, Mar 2, 2020 at 5:12 PM lec ssmi wrote:
> maybe you can combine the fields you want to use into one field
>
> Something Something 于2020年3月3日周二 上午6:37写道:
>
>> I am writing a
I am writing a Stateful Streaming application in which I am using
mapGroupsWithState to create aggregates for Groups but I need to create *Groups
based on more than one column in the Input Row*. All the examples in the
'Spark: The Definitive Guide' use only one column such as 'User' or
'Device'. I
Here's my use case: Messages are coming into a Kafka Topic for different
'Events'. Each event has a unique Event Id. I need to aggregate counts for
each Event AFTER the event is completed. For now, we are thinking we can
assume an event is completed if there are no more messages coming in for a
per
We've a Spark Streaming job that calculates some values in each batch. What
we need to do now is aggregate values across ALL batches. What is the best
strategy to do this in Spark Streaming. Should we use 'Spark Accumulators'
for this?
In my custom receiver for Spark Streaming I've code such as this:
messages.toArray().foreach(msg => {
val m = msg.asInstanceOf[Message]
* store(m.getBody)*
})
Instead of 'body' which is of type 'String', I would rather pass the en
Is it safe to assume that Spark will always create a single instance of
Custom Receiver? Or would it create multiple instances on each node in a
cluster? Wondering if I need to worry about receiving the same message on
different nodes etc.
Please help. Thanks.
I am not running locally. The Spark master is:
"spark://:7077"
On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das
wrote:
> What is the Spark master that you are using. Use local[4], not local
> if you are running locally.
>
> On Mon, Nov 10, 2014 at 3:01 PM, Something Som
I am embarrassed to admit but I can't get a basic 'word count' to work
under Kafka/Spark streaming. My code looks like this. I don't see any
word counts in console output. Also, don't see any output in UI. Needless
to say, I am newbie in both 'Spark' as well as 'Kafka'.
Please help. Thanks.
System.out.println(status);
> return status;
> }
> }
> );
>
>
> statuses.print()
> ;
>
>
>
>
> Or you could use foreachRDD instead of map() if your intention is just
>
> more number of threads.
> It may be due to topic name mismatch also.
>
> sparkConf.setMaster(“local[1]");
>
> Regards,
> Rahul
>
> From: Something Something
> Date: Wednesday, November 5, 2014 at 12:23 PM
> To: "Shao, Saisai"
> Cc:
ng on a standalone mode, the log is under
> /work/ directory. I’m not sure for yarn or mesos, you can check
> the document of Spark to see the details.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Something Something [mailto:mailinglist...@gmail.com]
> *Sent:* Wednesd
3. Did you see any exceptions when running the app, this will help
> to define the problem.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Something Something [mailto:mailinglist...@gmail.com]
> *Sent:* Wednesday, November 05, 2014 1:57 PM
> *To:* user@spark.a
I've following code in my program. I don't get any error, but it's not
consuming the messages either. Shouldn't the following code print the line
in the 'call' method? What am I missing?
Please help. Thanks.
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration
38 matches
Mail list logo