Hello everyone,
after upgrading to kafka streams 2.8 we have one streams app that's
stuck trying to restore a store changelog topic, this is the debug log
of the app:
https://gist.github.com/alex88/f31593aaabbd282b21f89a0252a28745
I would like to avoid having to delete and recreate the
there any reason why?
On 7/20/21 2:09 PM, Alessandro Tagliapietra wrote:
Hello everyone,
after upgrading to kafka streams 2.8 we have one streams app that's
stuck trying to restore a store changelog topic, this is the debug
log of the app:
https://gist.github.com/alex88/f31593aaabbd282b21f89a
I've tried to restart the streams application using at_least_once
processing guarantee and it worked, restarted again in exactly_once_beta
and it worked too.
Is there any reason why?
On 7/20/21 2:09 PM, Alessandro Tagliapietra wrote:
Hello everyone,
after upgrading to kafka streams 2.8 we
Hello everyone,
after upgrading to kafka streams 2.8 we have one streams app that's
stuck trying to restore a store changelog topic, this is the debug log
of the app:
https://gist.github.com/alex88/f31593aaabbd282b21f89a0252a28745
I would like to avoid having to delete and recreate the
e rate is < 1MB/s so it's probably going very slow
compared to what it should.
Thank you in advance
--
Alessandro Tagliapietra
are the machine IDs) ends up in the same streams instance.
Which is instead guaranteed with the intermediate topic?
Thanks!
--
Alessandro Tagliapietra
On Tue, May 12, 2020 at 7:16 AM Bill Bejeck wrote:
> Hi Alessandro,
>
> For merging the three streams, have you considered the `KStream.merge`
t
seen" of a machine is older than some time
To "merge' the 3 streams I was thinking to just map them into a single
intermediate topic and have the ValueTransformer read from that.
Is there a better way? Maybe without using an intermediate topic?
Thank you in advance
--
Alessandro Tagliapietra
exception? Or is the configuration
wrong?
--
Alessandro Tagliapietra
On Mon, Jan 6, 2020 at 10:31 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hello everyone,
>
> I'm trying to run kafka connect to dump avro messages to S3, I'm having
> issues with the valu
trategy but I
get: "java.lang.RuntimeException:
com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an
instance of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even
if the class extends TopicNameStrategy
- just adding value.converter (and auth/schema registry url) configs,
results in the same error as above just with the default class:
"java.lang.RuntimeException:
io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"
Any idea? It seems that adding value.converter to the connector breaks even
the default serialization class.
Thanks in advance
--
Alessandro Tagliapietra
problem in that
case was probably what you mentioned)
Regards
--
Alessandro Tagliapietra
On Fri, Dec 13, 2019 at 2:17 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hi Sophie,
>
> thanks for explaining that.
> So yeah it seems that since I'm using the de
way? Just to
tkeep track of it?
Regards
--
Alessandro Tagliapietra
On Thu, Dec 12, 2019 at 3:14 PM Sophie Blee-Goldman
wrote:
> Thanks for collecting all these metrics. It might be that as the length of
> the lists
> increases over time, the cache is able to hold fewer un
ose changelog topics increase
(full album https://imgur.com/a/tXlJJEO)
Any other metric that might be important?
It seems that the issue is between the aggregate and Ktable.toStream()
After a restart as expected usage go back to normal values
--
Alessandro Tagliapietra
On Mon, Dec 9, 20
the sent bytes from 30-40MB/minute to 400KB/minute?
I'll look into the custom state store tho.
Thanks
--
Alessandro Tagliapietra
On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman
wrote:
> Alright, well I see why you have so much data being sent to the changelog
> if each
> update
generated by simulators with very predictable
output rate.
In the meantime I've enabled reporting of debug metrics (so including cache
hit ratio) to hopefully get better insights the next time it happens.
Thank you in advance
--
Alessandro Tagliapietra
On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Gold
and starts working again? We're using EOS os I believe the commit interval
is every 100ms.
Regards
--
Alessandro Tagliapietra
On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman
wrote:
> It might be that the cache appears to "stop working" because it gets full,
> and each
>
:)
--
Alessandro Tagliapietra
On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> It seems that even with caching enabled, after a while the sent bytes stil
> go up
>
> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
>
> you can
It seems that even with caching enabled, after a while the sent bytes stil
go up
[image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
you can see the deploy when I've enabled caching but it looks like it's
still a temporary solution.
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 10:08 AM
Thanks
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 10:02 AM John Roesler wrote:
> Hmm, that’s a good question. Now that we’re talking about caching, I
> wonder if the cache was just too small. It’s not very big by default.
>
> On Sat, Dec 7, 2019, at 11:16, Alessandro Taglia
available? Any other
possible reason?
Thank you so much for your help
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 9:14 AM John Roesler wrote:
> Ah, yes. Glad you figured it out!
>
> Caching does not reduce EOS guarantees at all. I highly recommend using
> it. You might even want to
Never mind I've found out I can use `.withCachingEnabled` on the store
builder to achieve the same thing as the windowing example as
`Materialized.as` turns that on by default.
Does caching in any way reduces the EOS guarantees?
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 1:12 AM
a
record to the changelog?
- is the windowing doing some internal caching like not sending every
aggregation record until the window time is passed? (if so, where can I
find that code since I would like to use that also for our new
implementation)
Thank you in advance
--
Alessandro Tagliapietra
mx
monitoring so we can keep better track of what's going on :)
--
Alessandro Tagliapietra
On Tue, Dec 3, 2019 at 9:12 PM John Roesler wrote:
> Oh, yeah, I remember that conversation!
>
> Yes, then, I agree, if you're only storing state of the most recent window
> for each key,
ue
per key (if the cleanup just happened) right?
--
Alessandro Tagliapietra
On Tue, Dec 3, 2019 at 8:51 PM John Roesler wrote:
> Hey Alessandro,
>
> That sounds also like it would work. I'm wondering if it would actually
> change what you observe w.r.t. recovery behavior, though. Streams alread
only changelog topic should keep the rebuild data to
the minimum as it would have only the last value for each key.
Hope that makes sense
Thanks again
--
Alessandro Tagliapietra
On Tue, Dec 3, 2019 at 3:04 PM John Roesler wrote:
> Hi Alessandro,
>
> To take a stab at your question,
the checkpoint and then it does find it?
It seems it loaded about 2.7M records (sum of offset difference in the
"restorting partition " messages) right?
Maybe should I try to reduce the checkpoint interval?
Regards
--
Alessandro Tagliapietra
On Mon, Dec 2, 2019 at 9:18 AM John Roes
ressor basically) is there a way to only
keep the last window so that the store rebuilding goes faster and without
rebuilding old windows too? Or should I create a custom window using the
original key as key so that the compaction keeps only the last window data?
Thank you
--
Alessandro Tagliapietra
You can achieve exactly once on a consumer by enabling read committed and
manually committing the offset as soon as you receive a message. That way
you know that at next poll you won't get old message again.
On Fri, Sep 27, 2019, 6:24 AM christopher palm wrote:
> I had a similar question, and
vious why it was skipping messages. Now
that I know that I have full control over the flow I can add the exception
handling logic, idempotency or whatever I need, because I know for sure
that I won't lose messages.
Thanks
--
Alessandro Tagliapietra
On Thu, Sep 26, 2019 at 1:54 AM M. Manna wrote:
> con
ffset.
In fact, you can see that value 4 is reprocessed multiple times without a
restart and value 20 which throws and exception is reprocessed after a
restart because nothing has been committed.
Now, kafka gurus, is this the best way to achieve this? Wouldn't be better
to have a config like "en
The only way this works is if I don't catch the exception, let the consumer
crash and fully restart it.
Maybe the consumer has an internal state that always gets updated when it
receives a message during poll?
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra
"Committed
offset " part
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> It's still is in the topic because is weeks after the deletion threshold
> (today's message with a 4 weeks retention).
>
it seems that the offset just automatically increases, with another client
I also see all the values in the topic so they're not deleted.
Shouldn't it have retried the message with value 3?
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 6:19 PM Steve Howard
wrote:
> "I'm jus
thinking if there's something wrong with my code.
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 4:29 PM M. Manna wrote:
> How long is your message retention set for ? Perhaps you want to increase
> that to a large enough value.
>
> I have almost identical use case, but I wo
compact as policy not delete)
- the stream correctly processed data from that key over and over so I
don't think the producer ID has expired multiple times in minutes
Any help is very appreciated
--
Alessandro Tagliapietra
lose messages if I don't manually commit them which
doesn't seem to be the case
Any help is really appreciated
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 2:20 PM M. Manna wrote:
> Hi,
>
> How are you managing your offset commits ?
>
> Also, if it’s a duplicate record issu
it seems that after some retries the message is skipped and it goes
one with the next one.
What could be the reason? If in the next loop iteration it gets a message
from another partition it would also commit the offset of the other failed
partition?
Thank you
--
Alessandro Tagliapietra
Thanks a lot Bruno I'll check that!
--
Alessandro Tagliapietra
On Wed, Sep 18, 2019 at 4:20 PM Bruno Cadonna wrote:
> Hi Alessandro,
>
> If you want to get each update to an aggregate, you need to disable
> the cache. Otherwise, an update will only be emitted when the
> aggre
gger.
Since between these lines there's just the groupByKey and WindowedBy, are
there any logics in these two that could stop the flow of data? Since I
don't have any window closing mechanism or suppression shouldn't it just go
through?
Thank you in advance
--
Alessandro Tagliapietra
imestamp is per topic and not per key”. Can you
> please elaborate?
>
>
>
>
> > On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
> >
> > Did you ever push any data with a greater timestamp than the curren
Did you ever push any data with a greater timestamp than the current one
you're producing?
One thing took me a while to find out is that the suppress timestamp is per
topic and not per key
--
Alessandro Tagliapietra
On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari wrote:
> Yes I am able to
sure I emit a window only when I receive a
new one per each key.
--
Alessandro Tagliapietra
On Wed, Jul 17, 2019 at 9:25 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Seems that completely removing the grace period fixes the problem,
>
> is it expecte
Seems that completely removing the grace period fixes the problem,
is it expected? Is the grace period per key or global?
--
Alessandro Tagliapietra
On Wed, Jul 17, 2019 at 12:07 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> I've added a reproduction
Tagliapietra
On Tue, Jul 16, 2019 at 10:36 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Actually suppress doesn't matter, it happens later in the code, I've also
> tried to remove that and add a grace period to the window function but the
> issue persists.
>
Actually suppress doesn't matter, it happens later in the code, I've also
tried to remove that and add a grace period to the window function but the
issue persists.
--
Alessandro Tagliapietra
On Tue, Jul 16, 2019 at 10:17 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com>
extractor
reads the timestamp from the message.
Anyone has any idea on what could be the problem?
--
Alessandro Tagliapietra
I think everything should be working fine now.
Thank you again for your help!
--
Alessandro Tagliapietra
On Wed, Jul 10, 2019 at 5:37 AM Patrik Kleindl wrote:
> Hi
> Regarding the I/O, RocksDB has something called write amplification which
> writes the data to multiple levels i
storage usage.
--
Alessandro Tagliapietra
On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hi Bruno,
>
> Oh I see, I'll try to add a persistent disk where the local stores are.
> I've other questions then:
> - why is it a
Hi John,
thanks a lot for the great explanation and the links.
After I've sent the question I've researched a bit more about EOS and I'm
currently testing that out.
I'll read those links and see what I come up with!
Thanks and have a great day!
--
Alessandro Tagliapietra
On Tue, Jul 9, 2019
angelog as I do with the "LastValueStore" store which has
> > "withLoggingEnabled()", but even that store has:
> >
> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> > offset 403910
> > Restoring task 0_0's state store LastValueSt
hLoggingEnabled()", but even that store has:
Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
offset 403910
Restoring task 0_0's state store LastValueStore from beginning of the
changelog myapp-id-LastValueStore-changelog-0
Thank you everyone in advance
--
Alessandro Tagliapietra
pology starts from a stream uses multiple stores before
windowing, if there's an error in the windowing step, what happens to the
stores changes? When the message is reprocessed, will the store be in the
state it was after it processed the message on the first try?
Thank you in advance
--
Ales
understand what's going on.
Have a great day
--
Alessandro Tagliapietra
On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang wrote:
> Honestly I cannot think of an issue that fixed in 2.2.1 but not in 2.2.0
> which could be correlated to your observations:
>
>
> https://issues.apache.
Yes that's right,
could that be the problem? Anyway, so far after upgrading to 2.2.1 from
2.2.0 we didn't experience that problem anymore.
Regards
--
Alessandro Tagliapietra
On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang wrote:
> That's right, but local state is used as a "materiali
trap from
> scratch. E.g. if you are using K8s for cluster management, you'd better use
> stateful sets to make sure local states are preserves across re-deployment.
>
>
> Guozhang
>
> On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com>
it
already processed.
In the meantime I'm trying to upgrade to kafka 2.2.1 to see if I get any
improvement.
--
Alessandro Tagliapietra
On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang wrote:
> Hello Alessandro,
>
> What did you do for `restarting the app online`? I'm not sure I follow the
cloud kafka broker.
The problem is that locally everything is working properly, if I restart
the streams app it just continues where it left, if I restart the app
online it reprocesses the whole topic.
That shouldn't happen right?
Thanks in advance
--
Alessandro Tagliapietra
/43b72e23bda9e15657b008855e1904db is
the one with most information and the logs on what I was seeing.
Thank you for your help
--
Alessandro Tagliapietra
On Wed, May 8, 2019 at 2:18 AM Bruno Cadonna wrote:
> Hi Alessandro,
>
> Apologies for the late reply.
>
> I tried the code from your r
Hi John,
on Slack Matthias suggested to have my own transform to window the data
myself, I'll have a look into it and the Windows implementation as you
suggested and see what I can do!
Thanks for the advice!
--
Alessandro Tagliapietra
On Tue, May 7, 2019 at 8:45 AM John Roesler wrote:
>
ond from
the message timestamp but it's not an option I'd like.
Thank you
--
Alessandro Tagliapietra
d metric {"timestamp": 6, "production": 4}
S1 with computed metric {"timestamp": 6, "production": 5}
S1 with computed metric {"timestamp": 6, "production": 6}
S1 with computed metric {"timestamp": 7, "production
. Are there any snapshot builds
available?
In the meantime I'm trying to create a custom docker image from kafka
source.
Thanks
--
Alessandro Tagliapietra
On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna wrote:
> Hi Alessandro,
>
> It seems that the behaviour you described regarding the window ag
Thanks Matthias, one less thing to worry about in the future :)
--
Alessandro Tagliapietra
On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax
wrote:
> Just a side note. There is currently work in progress on
> https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
> conf
icAvroSerde.configure(serdeConfig, false);
and then in aggregate()
Materialized.with(Serdes.String(), valueSpecificAvroSerde)
fixed the issue.
Thanks in advance for the windowing help, very appreciated.
In the meantime I'll try to make some progress on the rest.
Have a great weekend
--
Al
h filtered metric{"timestamp": 161000, "production": 1}
S1 with computed metric {"timestamp": 16, "production": 10}
S1 with filtered metric{"timestamp": 162000, "production": 1}
as you can see, window for timestamp 16 is duplicated
Hi Bruno,
I'm using the confluent docker images 5.2.1, so kafka 2.2.
Anyway I'll try to make a small reproduction repo with all the different
cases soon.
Thank you
--
Alessandro Tagliapietra
On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna wrote:
> Hi Alessandro,
>
> What version of
, probably starting from scratch with a simpler example
Thank you for your help!
--
Alessandro Tagliapietra
On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna wrote:
> Hi Alessandro,
>
> Have a look at this Kafka Usage Pattern for computing averages without
> using an ArrayList.
a field of
type array so I can use the regular avro serializer to implement this.
Should I create my own serdes instead or is this the right way?
Thank you in advance
--
Alessandro Tagliapietra
On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com>
issues with message types since I'm changing
the data type when aggregating the window but I think it's an easy problem.
Thank you again
Best
--
Alessandro Tagliapietra
On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna wrote:
> Hi Alessandro,
>
> the `TransformSupplier` is internall
<https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology>
> .
is it? Doesn't `transform` need a TransformSupplier while `addProcessor`
uses a ProcessorSupplier?
Thank you again for your help
--
Alessandro Tagliapietra
On Fri, Apr 12, 2019 at 5:04 PM Bruno
but `.process(...)` returns void, so I
cannot have a KStream from a processor?
Thank you all in advance
--
Alessandro Tagliapietra
70 matches
Mail list logo