My Spark Structured Streaming job works fine when I set "startingOffsets"
to "latest". When I simply change it to "earliest" & specify a new "check
point directory", the job doesn't work. The states don't get timed out
after 10 minutes.
While debugging I noticed that my 'state' logic is indeed
No one responded. By the way, what’s
> your actual name “Something something” if you don’t mind me asking?
>
> On Tue, Jun 9, 2020 at 12:27 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> What is scary is this interface is marked as &
What is scary is this interface is marked as "experimental"
@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction extends Serializable {
R call(K key, Iterator values, GroupState state) throws Exception;
}
On Mon, Jun 8, 2020 at 11:54 AM Something
.groupByKey(v => ...)
> .mapGroupsWithState(timeoutConf =
> GroupStateTimeout.ProcessingTimeTimeout)(func
> = mappingFunc)
>
> val query = wordCounts.writeStream
> .outputMode(OutputMode.Update)
>
>
> On Mon, Jun 8, 2020 at 11:14 AM Something Somet
{
> myAccInTask.add(1);
> <...>
> state.update(newState);
> return new LeadingCharCount(key, newState);
> }
> }
> ```
>
> --
> Cheers,
> -z
>
> On Tue, 2 Jun 2020 10:28:36 +0800
> ZHANG Wei
pdateTask. Like
> when state times out or when the data is pushed to next Kafka topic etc.
>
> On Fri, May 29, 2020 at 11:55 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Thanks! I will take a look at the link. Just one question, you seem to be
&g
e state) {
> }
> }
>
> On Thu, May 28, 2020 at 10:59 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am assuming StateUpdateTask is your application specific class. Does it
>> have 'updateState' method or something? I googled but couldn't fin
t;
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 17:12:48 +0530
> > > Srinivas V wrote:
> > >
> > > > yes, I am using stateful structured streaming. Yes similar to what
> you
> > > do.
> > > &
Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Yes, that's exactly h
mulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed in
>> your code? I am talking about the Application
k data source
> itself. (Kafka connector in this case)
>
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Tue, May 26, 2020 at 6:42
:
> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> But one consolation is that when I send metrics to Graphana, the values
> are coming there.
>
> On Tue, May 26, 2020
I keep getting this error message:
*The message is 1169350 bytes when serialized which is larger than the
maximum request size you have configured with the max.request.size
configuration.*
As indicated in other posts, I am trying to set the “max.request.size”
configuration in the Producer as
g.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> _______
will keep looking for alternate approaches but any help would be
greatly appreciated. Thanks.
On Thu, May 14, 2020 at 2:36 PM Something Something <
mailinglist...@gmail.com> wrote:
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but th
In my structured streaming job I am updating Spark Accumulators in the
updateAcrossEvents method but they are always 0 when I try to print them in
my StreamingListener. Here's the code:
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
The
's being
> provided via StreamingQueryListener.
>
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>
> Hope this helps.
>
> On Fri, May 8, 2020 at 3:26 AM Something Something <
> mailinglist...@gmail.com> w
Is there a way to dynamically modify value of 'maxOffsetsPerTrigger' while
a Stateful Structured Streaming job is running?
We are thinking of auto-scaling our Spark cluster but if we don't modify
the value of 'maxOffsetsPerTrigger' dynamically would adding more VMs to
the cluster help? I don't
Is there a way to get the total no. of active states in memory at any given
point in a Stateful Spark Structured Streaming job? We are thinking of
using this metric for 'Auto Scaling' our Spark cluster.
I am trying to write an integration test using Embedded Kafka but I keep
getting NullPointerException. My test case is very simple. It has following
steps:
1. Read a JSON file & write messages to an inputTopic.
2. Perform a 'readStream' operation.
3. Do a 'select' on the Stream. This
-
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState
>
> On Wed, Mar 4, 2020 at 2:51 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I've set the timeout duration to "2 minutes" as follows:
>>
>
I've set the timeout duration to "2 minutes" as follows:
def updateAcrossEvents (tuple3: Tuple3[String, String, String],
inputs: Iterator[R00tJsonObject],
oldState: GroupState[MyState]): OutputRow = {
println(" Inside updateAcrossEvents with : " + tuple3._1 + ",
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe
it will help someone.
On Tue, Mar 3, 2020 at 6:02 PM Something Something
wrote:
> In a Stateful Spark Streaming application I am writing the 'OutputRow' in
> the 'updateAcrossEvents' but I keep getting this
In a Stateful Spark Streaming application I am writing the 'OutputRow' in
the 'updateAcrossEvents' but I keep getting this error (*Required attribute
'value' not found*) while it's trying to write to Kafka. I know from the
documentation that 'value' attribute needs to be set but how do I do that
There are lots of examples on 'Stateful Structured Streaming' in 'The
Definitive Guide' book BUT all of them read JSON from a 'path'. That's
working for me.
Now I need to read from Kafka.
I Googled but I couldn't find any example. I am struggling to Map the
'Value' of the Kafka message to my
of `updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
On Mon, Mar 2, 2020 at 5:12 PM lec ssmi wrote:
> maybe you can combine the fields you want to use into one field
>
> Something Something 于2020年3月3日周二 上午6:37写道:
>
>> I am writing a
I am writing a Stateful Streaming application in which I am using
mapGroupsWithState to create aggregates for Groups but I need to create *Groups
based on more than one column in the Input Row*. All the examples in the
'Spark: The Definitive Guide' use only one column such as 'User' or
'Device'. I
Here's my use case: Messages are coming into a Kafka Topic for different
'Events'. Each event has a unique Event Id. I need to aggregate counts for
each Event AFTER the event is completed. For now, we are thinking we can
assume an event is completed if there are no more messages coming in for a
We've a Spark Streaming job that calculates some values in each batch. What
we need to do now is aggregate values across ALL batches. What is the best
strategy to do this in Spark Streaming. Should we use 'Spark Accumulators'
for this?
Is it safe to assume that Spark will always create a single instance of
Custom Receiver? Or would it create multiple instances on each node in a
cluster? Wondering if I need to worry about receiving the same message on
different nodes etc.
Please help. Thanks.
In my custom receiver for Spark Streaming I've code such as this:
messages.toArray().foreach(msg => {
val m = msg.asInstanceOf[Message]
* store(m.getBody)*
})
Instead of 'body' which is of type 'String', I would rather pass the
I am embarrassed to admit but I can't get a basic 'word count' to work
under Kafka/Spark streaming. My code looks like this. I don't see any
word counts in console output. Also, don't see any output in UI. Needless
to say, I am newbie in both 'Spark' as well as 'Kafka'.
Please help. Thanks.
, Something Something
mailinglist...@gmail.com wrote:
I am embarrassed to admit but I can't get a basic 'word count' to work
under
Kafka/Spark streaming. My code looks like this. I don't see any word
counts in console output. Also, don't see any output in UI. Needless to
say, I am newbie
;
}
}
);
statuses.print()
;
Or you could use foreachRDD instead of map() if your intention is just
printing.
Thanks
Best Regards
On Wed, Nov 5, 2014 at 12:35 PM, Something Something
mailinglist...@gmail.com wrote:
It's not local. My spark url is something like
I've following code in my program. I don't get any error, but it's not
consuming the messages either. Shouldn't the following code print the line
in the 'call' method? What am I missing?
Please help. Thanks.
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
when running the app, this will help
to define the problem.
Thanks
Jerry
*From:* Something Something [mailto:mailinglist...@gmail.com]
*Sent:* Wednesday, November 05, 2014 1:57 PM
*To:* user@spark.apache.org
*Subject:* Kafka Consumer in Spark Streaming
I've following code in my
:
If you’re running on a standalone mode, the log is under
SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check
the document of Spark to see the details.
Thanks
Jerry
*From:* Something Something [mailto:mailinglist...@gmail.com]
*Sent:* Wednesday, November 05, 2014 2:28
] or
more number of threads.
It may be due to topic name mismatch also.
sparkConf.setMaster(“local[1]);
Regards,
Rahul
From: Something Something mailinglist...@gmail.com
Date: Wednesday, November 5, 2014 at 12:23 PM
To: Shao, Saisai saisai.s...@intel.com
Cc: user@spark.apache.org user
38 matches
Mail list logo