Re: Redis as state store

2021-03-15 Thread Guozhang Wang
Thanks!

On Mon, Mar 15, 2021 at 8:38 PM Sophie Blee-Goldman
 wrote:

> Yep, that fell off my radar.  Here we go:
> https://issues.apache.org/jira/browse/KAFKA-12475
>
> On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang  wrote:
>
> > Hey Sophie,
> >
> > Maybe we can first create a JIRA ticket for this?
> >
> > On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman
> >  wrote:
> >
> > > Sounds good! I meant anyone who is interested :)
> > >
> > > Let me know if you have any questions after digging in to this
> > >
> > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig 
> > wrote:
> > >
> > > >  Hey Sophie, not sure if you meant me or not but I'd be happy to
> take a
> > > > stab at creating a KIP for this.  I want to spend some time digging
> > into
> > > > more of how this works first, but will then try to gather my thoughts
> > and
> > > > get something created.
> > > >
> > > > Alex
> > > >
> > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
> > > >  wrote:
> > > >
> > > > > This certainly does seem like a flaw in the Streams API, although
> of
> > > > course
> > > > > Streams is just
> > > > > in general not designed for use with remote anything (API calls,
> > > stores,
> > > > > etc)
> > > > >
> > > > > That said, I don't see any reason why we *couldn't* have better
> > support
> > > > for
> > > > > remote state stores.
> > > > > Note that there's currently no deleteAll method on the store
> > interface,
> > > > and
> > > > > not all databases
> > > > > necessarily support that. But we could add a default implementation
> > > which
> > > > > just calls delete(key)
> > > > > on all keys in the state store, and for the RocksDB-based state
> > stores
> > > we
> > > > > still wipe out the state
> > > > > as usual (and recommend the same for any custom StateStore which is
> > > local
> > > > > rather than remote).
> > > > > Obviously falling back on individual delete(key) operations for all
> > the
> > > > > data in the entire store will
> > > > > have worse performance, but that's better than silently breaking
> EOS
> > > when
> > > > > deleteAll is not available
> > > > > on a remote store.
> > > > >
> > > > > Would you be interested in doing a KIP for this? We should also
> file
> > a
> > > > JIRA
> > > > > with the above explanation,
> > > > > so that other users of remote storage are aware of this limitation.
> > And
> > > > > definitely document this somewhere
> > > > >
> > > > >
> > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Alex,
> > > > > >
> > > > > > I guess wiping out the state directory is easier code-wise,
> faster,
> > > > > > and/or at the time of development the developers did not design
> for
> > > > > > remote state stores. But I do actually not know the exact reason.
> > > > > >
> > > > > > Off the top of my head, I do not know how to solve this for
> remote
> > > > state
> > > > > > stores. Using the uncaught exception handler is not good,
> because a
> > > > > > computing node could fail without giving the JVM the opportunity
> to
> > > > > > throw an exception.
> > > > > >
> > > > > > In your tests, try to increase the commit interval to a high
> value
> > > and
> > > > > > see if you get inconsistencies. You should get an inconsistency
> if
> > > the
> > > > > > state store maintains counts for keys and after the last commit
> > > before
> > > > > > the failure, the Streams app puts an event with a new key K with
> > > value
> > > > 1
> > > > > > into the state store. After failover, Streams would put the same
> > > event
> > > > > > with key K again into the state store. If the state store deleted
> > all
> > > > of
> > > > > > its data, Streams would put again value 1, but if the state store
> > did
> > > > > > not delete all data, Streams would put value 2 which is wrong
> > because
> > > > it
> > > > > > would count the same event twice.
> > > > > >
> > > > > > Best,
> > > > > > Bruno
> > > > > >
> > > > > >
> > > > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > > > Bruno,
> > > > > > > Thanks for the info!  that makes sense.  Of course now I have
> > more
> > > > > > > questions.  :)  Do you know why this is being done outside of
> the
> > > > state
> > > > > > > store API?  I assume there are reasons why a "deleteAll()" type
> > of
> > > > > > function
> > > > > > > wouldn't work, thereby allowing a state store to purge itself?
> > And
> > > > > maybe
> > > > > > > more importantly, is there a way to achieve a similar behavior
> > > with a
> > > > > 3rd
> > > > > > > party store?  I'm not sure if hooking into the uncaught
> exception
> > > > > handler
> > > > > > > might be a good way to purge/drop a state store in the event
> of a
> > > > fatal
> > > > > > > error?  I did setup a MongoDB state store recently as part of a
> > POC
> > > > and
> > > > > > was
> > > > > > > testing it with EOS enabled.  (forcing crashes to occur and
> > > checking
> > > > > that
> > > > > > > the result of my aggregation was still ac

Re: Kafka custom partition - consumers assignor with custom per partition user/custom data

2021-03-15 Thread Sophie Blee-Goldman
I believe I already answered your question in another channel, but just to
follow up in this thread in case anyone else is interested in the answer:

You can override the *ConsumerPartitionAssignor.onAssignment(Assignment,
ConsumerGroupMetadata)*  method to get an update on the currently
assigned partitions after each rebalance. The *Assignment* parameter
contains two fields: the assigned partitions, and the serialized assignment
userdata, if any.

On Sat, Mar 6, 2021 at 6:34 AM Mazen Ezzeddine <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Hi all,
>
> I am implementing a custom  consumers to topics/partitions assignor in
> Kafka. To this end, I am overriding the AbstractPartitionAssignor which in
> turn implements the  ConsumerPartitionAssignor interface.
>
> As part of the custom assignor, I want to send a single (float)
> information about each partition of each topic that the consumer subscribes
> to.
>
> I am aware that I can send custom data to the assignor by overriding the
> default method
> ByteBuffer subscriptionUserData(Set topics) .
>
> However, the issue is that from the method signature above I can not get
> the list of partitions assigned to the underlined consumer for each of the
> topics that the consumer registers for.
>
> On the other hand, I can see that the subscription class sent by each of
> the consumers to the group coordinator has list of the owned partitions per
> consumer.
> public static final class Subscription {
> private final List topics;
> private final ByteBuffer userData;
> private final List ownedPartitions;
> private Optional groupInstanceId;
>
>
> Any hint on how I can send custom per partition data to the group
> coordinator through the method ByteBuffer subscriptionUserData(Set
> topics), or using any other way that relies only on the kafka public APIs.
>
> Thank you.
>
>


Re: Slightly Modified Sticky Assignor.

2021-03-15 Thread Sophie Blee-Goldman
Hey Mazen,

The easiest way to approach this is probably to pass in a reference to the
associated Consumer and
then just call one of the *Consumer#committed *methods which return the
OffsetAndMetadata.

But I'm guessing your underling question may be about how to get that
reference to the Consumer in
the first place. There really isn't very good support for this in the
ConsumerPartitionAssignor interface
since it relies on reflection to instantiate the assignor with the default
constructor. I would recommend
making your custom ConsumerPartitionAssignor implement the Configurable
interface, and then use
the configs you pass in to the Consumer to ultimately get a handle on it.
Since you have to provide the
configs to construct the Consumer in the first place, you might need a
layer of indirection: for example

class ConsumerProvider {
private Consumer consumer;

public void setConsumer(Consumer consumer);

public Consumer getConsumer();
}

// main code
ConsumerProvider provider = new ConsumerProvider;
consumerConfig.put("MY_CONSUMER_PROVIDER", provider);
Consumer consumer = new KafkaConsumer(consumerConfig, ...);
provider.setConsumer(consumer);

class MyAssignor implements ConsumerPArtitionAssignor, Configurable {

private ConsumerProvider;

@Override
public void configure(configs) {
this.consumerProvider = configs.get("MY_CONSUMER_PROVIDER");
}

@Override
ByteBuffer subscriptionUserData(topics) {
offsets = consumerProvider.getConsumer().committed(...);
}
}

Just a warning, I haven't actually tested this out, but the general idea of
using configs should work.

I know you said "seamless" and this is anything but :/ Maybe I'm tired and
missing something obvious, but
clearly there's room for improvement here. You can file a JIRA ticket to
improve the partition assignor
experience and make it easier to set up (and even work on this yourself if
you're interested)

Unfortunately you generally want to keep the subscriptionUserData() method
pretty light, and this
method will make a remote call to the server. To be honest, I'm not totally
sure why that is the case, since
the Consumer should know what offsets it's committed for which
partitions...maybe someone else can
jump in on the choice behind that. This has come up before, so it's worth
investigating whether we can
just return the cached offsets if they're available and only make a remote
call for *Consumer#committed* if
absolutely necessary. I'll try to follow up on that

On Tue, Mar 9, 2021 at 9:26 AM Mazen Ezzeddine <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Hi all,
>
> I am implementing a custom partition assignor slightly different than the
> sticky assignor  assignor. As known, in the sticky assignor each consumer
> sends the set of owned partitions as part of its subscription message. This
> happens in the subscriptionUserData by calling the
> serializeTopicPartitionAssignment method etc…
>
> Kindly, my question is that what is the most seamless way to get offset
> information (e.g., last committed offset) for each owned partition from
> within the subscriptionUserData method or generally from within the
> stickyAssignor class, preferably using public APIs.
>
> Thank you.
>


Emit events that are NOT joined

2021-03-15 Thread Ross Black
Hi,

I am trying to find the best pattern to solve a specific problem using
Kafka streaming.  All of our current processing uses the Kafka streaming
API (using multiple joins, windows, repartitions etc) so I already think I
have a decent grasp of the fundamentals.

We have 2 streams of events:
- primary events (P), which indicate some key event in the system and carry
a large amount of data
- secondary events (S), which should *always* occur as a follow-on to the
primary event and only contain a reference to the single associated primary
event.

I want to join secondary events to primary events (the simple part) BUT I
also want to find out when secondary events have been *unable* to be joined.
A secondary is unable to be joined:
- when primary event delivery has been delayed (so that secondary events
are received before the associated primary event)
- when primary events go missing (the event collection system is noisy, so
we do lose a small bu significant number of primary events)
- due to coding errors in the collectors, where an incorrect reference has
been inserted into the secondary event

Currently this functionality is implemented using a database:
- primary events are inserted into the database and then secondary events
lookup the primary by-reference.  If the primary is found the secondary is
sent to a "JOINED" topic.
- if the primary is not found, the secondary event is buffered in the
database until the primary is received and then joined+emitted (and the
secondary event is removed from the DB)
- after some arbitrary time period, the database is queried for outstanding
not-joined secondary events and they are emitted to an "UNJOINED" topic.
This allows alerting on unmatched secondary events to drive quality
measures, and allows offline analysis (to understand why)

Some options:
1. Implement the same strategy as existing except using Kafka state stores
instead of the DB.  With this approach I am concerned about atomic
correctness - i.e. that state in the Kafka store can be managed so that the
event is never sent to both JOINED and UNJOINED.


2. Continually emit key-values to a "PENDING" topic for the secondary join.
An example sequence could be something like ...(where primary events = P,
secondary events = S) :
a) receive S with no matching P => emit {S, false}
b) receive matching P for S => emit {S, null} (to effectively delete it
from the topic)
c) receive S with matching P => do not emit anything

Now the problem becomes more like building a time-window of events from
PENDING, to eventually emit the events to UNJOINED.  I am also uncertain as
how to ensure events can never end up in both JOINED and UNJOINED.

My apologies for the wall of text .. I find it a difficult problem to
explain. 😏


Is there some pattern I am missing that will help solve this problem?
Any help / other suggestions would be appreciated.

Thanks,
Ross


Re: [VOTE] 2.6.2 RC0

2021-03-15 Thread Sophie Blee-Goldman
Thanks Luke. I'll make sure to bump this in the kafka-site repo once the
release is finalized.

On Mon, Mar 15, 2021 at 8:58 PM Luke Chen  wrote:

> Hi Sophie,
> A small doc update for 2.6.2. I think we missed it.
> https://github.com/apache/kafka/pull/10328
>
> Thanks.
> Luke
>
> On Tue, Mar 16, 2021 at 11:12 AM Guozhang Wang  wrote:
>
> > Hi Sophie,
> >
> > I've reviewed the javadocs / release notes / documentations, and they
> LGTM.
> >
> > +1.
> >
> >
> > Guozhang
> >
> > On Fri, Mar 12, 2021 at 10:48 AM Sophie Blee-Goldman
> >  wrote:
> >
> > > Hello Kafka users, developers and client-developers,
> > >
> > > This is the first candidate for release of Apache Kafka 2.6.2.
> > >
> > > Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the
> > 2.6.1
> > > release. Please see the release notes for more information.
> > >
> > > Release notes for the 2.6.2 release:
> > >
> https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by Friday, March 19th, 9am PST
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > https://kafka.apache.org/KEYS
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/
> > >
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > >
> > > * Javadoc:
> > > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/
> > >
> > > * Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag:
> > > https://github.com/apache/kafka/releases/tag/2.6.2-rc0
> > >
> > > * Documentation:
> > > https://kafka.apache.org/26/documentation.html
> > >
> > > * Protocol:
> > > https://kafka.apache.org/26/protocol.html
> > >
> > > * Successful Jenkins builds for the 2.6 branch:
> > > Unit/integration tests:
> > > https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/
> > > System tests:
> > >
> https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/
> > >
> > > /**
> > >
> > > Thanks,
> > > Sophie
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [VOTE] 2.6.2 RC0

2021-03-15 Thread Luke Chen
Hi Sophie,
A small doc update for 2.6.2. I think we missed it.
https://github.com/apache/kafka/pull/10328

Thanks.
Luke

On Tue, Mar 16, 2021 at 11:12 AM Guozhang Wang  wrote:

> Hi Sophie,
>
> I've reviewed the javadocs / release notes / documentations, and they LGTM.
>
> +1.
>
>
> Guozhang
>
> On Fri, Mar 12, 2021 at 10:48 AM Sophie Blee-Goldman
>  wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the first candidate for release of Apache Kafka 2.6.2.
> >
> > Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the
> 2.6.1
> > release. Please see the release notes for more information.
> >
> > Release notes for the 2.6.2 release:
> > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Friday, March 19th, 9am PST
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > https://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/
> >
> > * Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag:
> > https://github.com/apache/kafka/releases/tag/2.6.2-rc0
> >
> > * Documentation:
> > https://kafka.apache.org/26/documentation.html
> >
> > * Protocol:
> > https://kafka.apache.org/26/protocol.html
> >
> > * Successful Jenkins builds for the 2.6 branch:
> > Unit/integration tests:
> > https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/
> > System tests:
> > https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/
> >
> > /**
> >
> > Thanks,
> > Sophie
> >
>
>
> --
> -- Guozhang
>


Re: Redis as state store

2021-03-15 Thread Sophie Blee-Goldman
Yep, that fell off my radar.  Here we go:
https://issues.apache.org/jira/browse/KAFKA-12475

On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang  wrote:

> Hey Sophie,
>
> Maybe we can first create a JIRA ticket for this?
>
> On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman
>  wrote:
>
> > Sounds good! I meant anyone who is interested :)
> >
> > Let me know if you have any questions after digging in to this
> >
> > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig 
> wrote:
> >
> > >  Hey Sophie, not sure if you meant me or not but I'd be happy to take a
> > > stab at creating a KIP for this.  I want to spend some time digging
> into
> > > more of how this works first, but will then try to gather my thoughts
> and
> > > get something created.
> > >
> > > Alex
> > >
> > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
> > >  wrote:
> > >
> > > > This certainly does seem like a flaw in the Streams API, although of
> > > course
> > > > Streams is just
> > > > in general not designed for use with remote anything (API calls,
> > stores,
> > > > etc)
> > > >
> > > > That said, I don't see any reason why we *couldn't* have better
> support
> > > for
> > > > remote state stores.
> > > > Note that there's currently no deleteAll method on the store
> interface,
> > > and
> > > > not all databases
> > > > necessarily support that. But we could add a default implementation
> > which
> > > > just calls delete(key)
> > > > on all keys in the state store, and for the RocksDB-based state
> stores
> > we
> > > > still wipe out the state
> > > > as usual (and recommend the same for any custom StateStore which is
> > local
> > > > rather than remote).
> > > > Obviously falling back on individual delete(key) operations for all
> the
> > > > data in the entire store will
> > > > have worse performance, but that's better than silently breaking EOS
> > when
> > > > deleteAll is not available
> > > > on a remote store.
> > > >
> > > > Would you be interested in doing a KIP for this? We should also file
> a
> > > JIRA
> > > > with the above explanation,
> > > > so that other users of remote storage are aware of this limitation.
> And
> > > > definitely document this somewhere
> > > >
> > > >
> > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> >  > > >
> > > > wrote:
> > > >
> > > > > Hi Alex,
> > > > >
> > > > > I guess wiping out the state directory is easier code-wise, faster,
> > > > > and/or at the time of development the developers did not design for
> > > > > remote state stores. But I do actually not know the exact reason.
> > > > >
> > > > > Off the top of my head, I do not know how to solve this for remote
> > > state
> > > > > stores. Using the uncaught exception handler is not good, because a
> > > > > computing node could fail without giving the JVM the opportunity to
> > > > > throw an exception.
> > > > >
> > > > > In your tests, try to increase the commit interval to a high value
> > and
> > > > > see if you get inconsistencies. You should get an inconsistency if
> > the
> > > > > state store maintains counts for keys and after the last commit
> > before
> > > > > the failure, the Streams app puts an event with a new key K with
> > value
> > > 1
> > > > > into the state store. After failover, Streams would put the same
> > event
> > > > > with key K again into the state store. If the state store deleted
> all
> > > of
> > > > > its data, Streams would put again value 1, but if the state store
> did
> > > > > not delete all data, Streams would put value 2 which is wrong
> because
> > > it
> > > > > would count the same event twice.
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > >
> > > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > > Bruno,
> > > > > > Thanks for the info!  that makes sense.  Of course now I have
> more
> > > > > > questions.  :)  Do you know why this is being done outside of the
> > > state
> > > > > > store API?  I assume there are reasons why a "deleteAll()" type
> of
> > > > > function
> > > > > > wouldn't work, thereby allowing a state store to purge itself?
> And
> > > > maybe
> > > > > > more importantly, is there a way to achieve a similar behavior
> > with a
> > > > 3rd
> > > > > > party store?  I'm not sure if hooking into the uncaught exception
> > > > handler
> > > > > > might be a good way to purge/drop a state store in the event of a
> > > fatal
> > > > > > error?  I did setup a MongoDB state store recently as part of a
> POC
> > > and
> > > > > was
> > > > > > testing it with EOS enabled.  (forcing crashes to occur and
> > checking
> > > > that
> > > > > > the result of my aggregation was still accurate)  I was unable to
> > > cause
> > > > > > inconsistent data in the mongo store (which is good!), though of
> > > > course I
> > > > > > may just have been getting lucky.  Thanks again for your help,
> > > > > >
> > > > > > Alex
> > > > > >
> > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> > pdeole2...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Bruno

Re: [VOTE] 2.6.2 RC0

2021-03-15 Thread Guozhang Wang
Hi Sophie,

I've reviewed the javadocs / release notes / documentations, and they LGTM.

+1.


Guozhang

On Fri, Mar 12, 2021 at 10:48 AM Sophie Blee-Goldman
 wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for release of Apache Kafka 2.6.2.
>
> Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the 2.6.1
> release. Please see the release notes for more information.
>
> Release notes for the 2.6.2 release:
> https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Friday, March 19th, 9am PST
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/
>
> * Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag:
> https://github.com/apache/kafka/releases/tag/2.6.2-rc0
>
> * Documentation:
> https://kafka.apache.org/26/documentation.html
>
> * Protocol:
> https://kafka.apache.org/26/protocol.html
>
> * Successful Jenkins builds for the 2.6 branch:
> Unit/integration tests:
> https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/
> System tests:
> https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/
>
> /**
>
> Thanks,
> Sophie
>


-- 
-- Guozhang


Re: Redis as state store

2021-03-15 Thread Guozhang Wang
Hey Sophie,

Maybe we can first create a JIRA ticket for this?

On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman
 wrote:

> Sounds good! I meant anyone who is interested :)
>
> Let me know if you have any questions after digging in to this
>
> On Mon, Mar 15, 2021 at 2:39 PM Alex Craig  wrote:
>
> >  Hey Sophie, not sure if you meant me or not but I'd be happy to take a
> > stab at creating a KIP for this.  I want to spend some time digging into
> > more of how this works first, but will then try to gather my thoughts and
> > get something created.
> >
> > Alex
> >
> > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
> >  wrote:
> >
> > > This certainly does seem like a flaw in the Streams API, although of
> > course
> > > Streams is just
> > > in general not designed for use with remote anything (API calls,
> stores,
> > > etc)
> > >
> > > That said, I don't see any reason why we *couldn't* have better support
> > for
> > > remote state stores.
> > > Note that there's currently no deleteAll method on the store interface,
> > and
> > > not all databases
> > > necessarily support that. But we could add a default implementation
> which
> > > just calls delete(key)
> > > on all keys in the state store, and for the RocksDB-based state stores
> we
> > > still wipe out the state
> > > as usual (and recommend the same for any custom StateStore which is
> local
> > > rather than remote).
> > > Obviously falling back on individual delete(key) operations for all the
> > > data in the entire store will
> > > have worse performance, but that's better than silently breaking EOS
> when
> > > deleteAll is not available
> > > on a remote store.
> > >
> > > Would you be interested in doing a KIP for this? We should also file a
> > JIRA
> > > with the above explanation,
> > > so that other users of remote storage are aware of this limitation. And
> > > definitely document this somewhere
> > >
> > >
> > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
>  > >
> > > wrote:
> > >
> > > > Hi Alex,
> > > >
> > > > I guess wiping out the state directory is easier code-wise, faster,
> > > > and/or at the time of development the developers did not design for
> > > > remote state stores. But I do actually not know the exact reason.
> > > >
> > > > Off the top of my head, I do not know how to solve this for remote
> > state
> > > > stores. Using the uncaught exception handler is not good, because a
> > > > computing node could fail without giving the JVM the opportunity to
> > > > throw an exception.
> > > >
> > > > In your tests, try to increase the commit interval to a high value
> and
> > > > see if you get inconsistencies. You should get an inconsistency if
> the
> > > > state store maintains counts for keys and after the last commit
> before
> > > > the failure, the Streams app puts an event with a new key K with
> value
> > 1
> > > > into the state store. After failover, Streams would put the same
> event
> > > > with key K again into the state store. If the state store deleted all
> > of
> > > > its data, Streams would put again value 1, but if the state store did
> > > > not delete all data, Streams would put value 2 which is wrong because
> > it
> > > > would count the same event twice.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > >
> > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > Bruno,
> > > > > Thanks for the info!  that makes sense.  Of course now I have more
> > > > > questions.  :)  Do you know why this is being done outside of the
> > state
> > > > > store API?  I assume there are reasons why a "deleteAll()" type of
> > > > function
> > > > > wouldn't work, thereby allowing a state store to purge itself?  And
> > > maybe
> > > > > more importantly, is there a way to achieve a similar behavior
> with a
> > > 3rd
> > > > > party store?  I'm not sure if hooking into the uncaught exception
> > > handler
> > > > > might be a good way to purge/drop a state store in the event of a
> > fatal
> > > > > error?  I did setup a MongoDB state store recently as part of a POC
> > and
> > > > was
> > > > > testing it with EOS enabled.  (forcing crashes to occur and
> checking
> > > that
> > > > > the result of my aggregation was still accurate)  I was unable to
> > cause
> > > > > inconsistent data in the mongo store (which is good!), though of
> > > course I
> > > > > may just have been getting lucky.  Thanks again for your help,
> > > > >
> > > > > Alex
> > > > >
> > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> pdeole2...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Bruno,
> > > > >>
> > > > >> i tried to explain this in 'kafka user's language through above
> > > > mentioned
> > > > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > > > >>
> > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> pdeole2...@gmail.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >>> This is what I understand could be the issue with external state
> > > store:
> > > > >>>
> > > > >>> kafka stream appl

Right Use Case For Kafka Streams?

2021-03-15 Thread Gareth Collins
Hi,

We have a requirement to calculate metrics on a huge number of keys (could
be hundreds of millions, perhaps billions of keys - attempting caching on
individual keys in many cases will have almost a 0% cache hit rate). Is
Kafka Streams with RocksDB and compacting topics the right tool for a task
like that?

As well, just from playing with Kafka Streams for a week it feels like it
wants to create a lot of separate stores by default (if I want to calculate
aggregates on five, ten and 30 days I will get three separate stores by
default for this state data). Coming from a different distributed storage
solution, I feel like I want to put them together in one store as I/O has
always been my bottleneck (1 big read and 1 big write is better than three
small separate reads and three small separate writes).

But am I perhaps missing something here? I don't want to avoid the DSL that
Kafka Streams provides if I don't have to. Will the Kafka Streams RocksDB
solution be so much faster than a distributed read that it won't be the
bottleneck even with huge amounts of data?

Any info/opinions would be greatly appreciated.

thanks in advance,
Gareth Collins


Re: Redis as state store

2021-03-15 Thread Sophie Blee-Goldman
Sounds good! I meant anyone who is interested :)

Let me know if you have any questions after digging in to this

On Mon, Mar 15, 2021 at 2:39 PM Alex Craig  wrote:

>  Hey Sophie, not sure if you meant me or not but I'd be happy to take a
> stab at creating a KIP for this.  I want to spend some time digging into
> more of how this works first, but will then try to gather my thoughts and
> get something created.
>
> Alex
>
> On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
>  wrote:
>
> > This certainly does seem like a flaw in the Streams API, although of
> course
> > Streams is just
> > in general not designed for use with remote anything (API calls, stores,
> > etc)
> >
> > That said, I don't see any reason why we *couldn't* have better support
> for
> > remote state stores.
> > Note that there's currently no deleteAll method on the store interface,
> and
> > not all databases
> > necessarily support that. But we could add a default implementation which
> > just calls delete(key)
> > on all keys in the state store, and for the RocksDB-based state stores we
> > still wipe out the state
> > as usual (and recommend the same for any custom StateStore which is local
> > rather than remote).
> > Obviously falling back on individual delete(key) operations for all the
> > data in the entire store will
> > have worse performance, but that's better than silently breaking EOS when
> > deleteAll is not available
> > on a remote store.
> >
> > Would you be interested in doing a KIP for this? We should also file a
> JIRA
> > with the above explanation,
> > so that other users of remote storage are aware of this limitation. And
> > definitely document this somewhere
> >
> >
> > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna  >
> > wrote:
> >
> > > Hi Alex,
> > >
> > > I guess wiping out the state directory is easier code-wise, faster,
> > > and/or at the time of development the developers did not design for
> > > remote state stores. But I do actually not know the exact reason.
> > >
> > > Off the top of my head, I do not know how to solve this for remote
> state
> > > stores. Using the uncaught exception handler is not good, because a
> > > computing node could fail without giving the JVM the opportunity to
> > > throw an exception.
> > >
> > > In your tests, try to increase the commit interval to a high value and
> > > see if you get inconsistencies. You should get an inconsistency if the
> > > state store maintains counts for keys and after the last commit before
> > > the failure, the Streams app puts an event with a new key K with value
> 1
> > > into the state store. After failover, Streams would put the same event
> > > with key K again into the state store. If the state store deleted all
> of
> > > its data, Streams would put again value 1, but if the state store did
> > > not delete all data, Streams would put value 2 which is wrong because
> it
> > > would count the same event twice.
> > >
> > > Best,
> > > Bruno
> > >
> > >
> > > On 15.03.21 15:20, Alex Craig wrote:
> > > > Bruno,
> > > > Thanks for the info!  that makes sense.  Of course now I have more
> > > > questions.  :)  Do you know why this is being done outside of the
> state
> > > > store API?  I assume there are reasons why a "deleteAll()" type of
> > > function
> > > > wouldn't work, thereby allowing a state store to purge itself?  And
> > maybe
> > > > more importantly, is there a way to achieve a similar behavior with a
> > 3rd
> > > > party store?  I'm not sure if hooking into the uncaught exception
> > handler
> > > > might be a good way to purge/drop a state store in the event of a
> fatal
> > > > error?  I did setup a MongoDB state store recently as part of a POC
> and
> > > was
> > > > testing it with EOS enabled.  (forcing crashes to occur and checking
> > that
> > > > the result of my aggregation was still accurate)  I was unable to
> cause
> > > > inconsistent data in the mongo store (which is good!), though of
> > course I
> > > > may just have been getting lucky.  Thanks again for your help,
> > > >
> > > > Alex
> > > >
> > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole 
> > > wrote:
> > > >
> > > >> Bruno,
> > > >>
> > > >> i tried to explain this in 'kafka user's language through above
> > > mentioned
> > > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > > >>
> > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole  >
> > > >> wrote:
> > > >>
> > > >>> This is what I understand could be the issue with external state
> > store:
> > > >>>
> > > >>> kafka stream application consumes source topic, does processing,
> > stores
> > > >>> state to kafka state store (this is backed by topic) and before
> > > producing
> > > >>> event on destination topic, the application fails with some issue.
> In
> > > >>> this case, the transaction has failed, so kafka guarantees either
> all
> > > or
> > > >>> none, means offset written to source topic, state written to state
> > > store
> > > >>> topic, output produced on de

MirrorMaker 2 and Negative Lag

2021-03-15 Thread Alan Ning
I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics from
one cluster to another while preserving through
`sync.group.offsets.enabled=true`. My source cluster is running Kafka 0.10,
while the target cluster is running 2.6.1.

While I can see data being replicated, the data on the replicated Consumer
Group in the target cluster looks wrong. The lag values of the replicated
Consumer Group are large negative values, and the LOG-END-OFFSET are mostly
0. I determined this information from kafka-consumer-groups.sh.

I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag JMX
metrics in MM2 and the reported lag is zero for all partitions.

By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will
automatically replicate and sync all Consumer Groups with a meaningful
offset in the target cluster. Am I misunderstanding how MM2 is supposed to
work?

Here is my mm2.properties and the CG details.

# mm2.properties
```
clusters = src, dst
src.bootstrap.servers = 10.0.0.1:9092
dst.bootstrap.servers = 10.0.0.2:9092
src->dst.enabled = true
src->dst.topics = compute.*
src->dst.offset.flush.timeout.ms=6
src->dst.buffer.memory=1
dst->src.enabled = true
dst->src.topics = .*
replication.factor=3
src->dst.sync.group.offsets.enabled = true
src->dst.emit.checkpoints.enabled = true
src->dst.consumer.auto.offset.reset=latest
consumer.auto.offset.reset = latest
auto.offset.reset = latest
replication.policy.class =
com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
sync.topic.acls.enabled = false
sync.group.offsets.enabled = true
emit.checkpoints.enabled = true
tasks.max = 8
dst.producer.offset.flush.timeout.ms = 6
dst.offset.flush.timeout.ms = 6
```

Consumer Group details
```
GROUP TOPIC
PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID HOSTCLIENT-ID
kafka-group-Compute-Requests Compute-Requests 57 5305947 0
  -5305947-   -   -
kafka-group-Compute-Requests Compute-Requests 20 5164205 0
  -5164205-   -   -
kafka-group-Compute-Requests Compute-Requests 53 4208527 0
  -4208527-   -   -
kafka-group-Compute-Requests Compute-Requests 82 5247928 0
  -5247928-   -   -
kafka-group-Compute-Requests Compute-Requests 65 5574520 0
  -5574520-   -   -
kafka-group-Compute-Requests Compute-Requests 11 5190708
209 -5190499-   -   -
```

Thanks

... Alan


Re: Redis as state store

2021-03-15 Thread Alex Craig
 Hey Sophie, not sure if you meant me or not but I'd be happy to take a
stab at creating a KIP for this.  I want to spend some time digging into
more of how this works first, but will then try to gather my thoughts and
get something created.

Alex

On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
 wrote:

> This certainly does seem like a flaw in the Streams API, although of course
> Streams is just
> in general not designed for use with remote anything (API calls, stores,
> etc)
>
> That said, I don't see any reason why we *couldn't* have better support for
> remote state stores.
> Note that there's currently no deleteAll method on the store interface, and
> not all databases
> necessarily support that. But we could add a default implementation which
> just calls delete(key)
> on all keys in the state store, and for the RocksDB-based state stores we
> still wipe out the state
> as usual (and recommend the same for any custom StateStore which is local
> rather than remote).
> Obviously falling back on individual delete(key) operations for all the
> data in the entire store will
> have worse performance, but that's better than silently breaking EOS when
> deleteAll is not available
> on a remote store.
>
> Would you be interested in doing a KIP for this? We should also file a JIRA
> with the above explanation,
> so that other users of remote storage are aware of this limitation. And
> definitely document this somewhere
>
>
> On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna 
> wrote:
>
> > Hi Alex,
> >
> > I guess wiping out the state directory is easier code-wise, faster,
> > and/or at the time of development the developers did not design for
> > remote state stores. But I do actually not know the exact reason.
> >
> > Off the top of my head, I do not know how to solve this for remote state
> > stores. Using the uncaught exception handler is not good, because a
> > computing node could fail without giving the JVM the opportunity to
> > throw an exception.
> >
> > In your tests, try to increase the commit interval to a high value and
> > see if you get inconsistencies. You should get an inconsistency if the
> > state store maintains counts for keys and after the last commit before
> > the failure, the Streams app puts an event with a new key K with value 1
> > into the state store. After failover, Streams would put the same event
> > with key K again into the state store. If the state store deleted all of
> > its data, Streams would put again value 1, but if the state store did
> > not delete all data, Streams would put value 2 which is wrong because it
> > would count the same event twice.
> >
> > Best,
> > Bruno
> >
> >
> > On 15.03.21 15:20, Alex Craig wrote:
> > > Bruno,
> > > Thanks for the info!  that makes sense.  Of course now I have more
> > > questions.  :)  Do you know why this is being done outside of the state
> > > store API?  I assume there are reasons why a "deleteAll()" type of
> > function
> > > wouldn't work, thereby allowing a state store to purge itself?  And
> maybe
> > > more importantly, is there a way to achieve a similar behavior with a
> 3rd
> > > party store?  I'm not sure if hooking into the uncaught exception
> handler
> > > might be a good way to purge/drop a state store in the event of a fatal
> > > error?  I did setup a MongoDB state store recently as part of a POC and
> > was
> > > testing it with EOS enabled.  (forcing crashes to occur and checking
> that
> > > the result of my aggregation was still accurate)  I was unable to cause
> > > inconsistent data in the mongo store (which is good!), though of
> course I
> > > may just have been getting lucky.  Thanks again for your help,
> > >
> > > Alex
> > >
> > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole 
> > wrote:
> > >
> > >> Bruno,
> > >>
> > >> i tried to explain this in 'kafka user's language through above
> > mentioned
> > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > >>
> > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole 
> > >> wrote:
> > >>
> > >>> This is what I understand could be the issue with external state
> store:
> > >>>
> > >>> kafka stream application consumes source topic, does processing,
> stores
> > >>> state to kafka state store (this is backed by topic) and before
> > producing
> > >>> event on destination topic, the application fails with some issue. In
> > >>> this case, the transaction has failed, so kafka guarantees either all
> > or
> > >>> none, means offset written to source topic, state written to state
> > store
> > >>> topic, output produced on destination topic... all of these happen or
> > >> none
> > >>> of these and in this failure scenario it is none of these.
> > >>>
> > >>> Assume you have redis state store, and you updated the state into
> redis
> > >>> and stream application failed. Now, you have source topic and
> > destination
> > >>> topic consistent i.e. offset is not committed to source topic and
> > output
> > >>> not produced on destination topic

Re: Redis as state store

2021-03-15 Thread Sophie Blee-Goldman
This certainly does seem like a flaw in the Streams API, although of course
Streams is just
in general not designed for use with remote anything (API calls, stores,
etc)

That said, I don't see any reason why we *couldn't* have better support for
remote state stores.
Note that there's currently no deleteAll method on the store interface, and
not all databases
necessarily support that. But we could add a default implementation which
just calls delete(key)
on all keys in the state store, and for the RocksDB-based state stores we
still wipe out the state
as usual (and recommend the same for any custom StateStore which is local
rather than remote).
Obviously falling back on individual delete(key) operations for all the
data in the entire store will
have worse performance, but that's better than silently breaking EOS when
deleteAll is not available
on a remote store.

Would you be interested in doing a KIP for this? We should also file a JIRA
with the above explanation,
so that other users of remote storage are aware of this limitation. And
definitely document this somewhere


On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna 
wrote:

> Hi Alex,
>
> I guess wiping out the state directory is easier code-wise, faster,
> and/or at the time of development the developers did not design for
> remote state stores. But I do actually not know the exact reason.
>
> Off the top of my head, I do not know how to solve this for remote state
> stores. Using the uncaught exception handler is not good, because a
> computing node could fail without giving the JVM the opportunity to
> throw an exception.
>
> In your tests, try to increase the commit interval to a high value and
> see if you get inconsistencies. You should get an inconsistency if the
> state store maintains counts for keys and after the last commit before
> the failure, the Streams app puts an event with a new key K with value 1
> into the state store. After failover, Streams would put the same event
> with key K again into the state store. If the state store deleted all of
> its data, Streams would put again value 1, but if the state store did
> not delete all data, Streams would put value 2 which is wrong because it
> would count the same event twice.
>
> Best,
> Bruno
>
>
> On 15.03.21 15:20, Alex Craig wrote:
> > Bruno,
> > Thanks for the info!  that makes sense.  Of course now I have more
> > questions.  :)  Do you know why this is being done outside of the state
> > store API?  I assume there are reasons why a "deleteAll()" type of
> function
> > wouldn't work, thereby allowing a state store to purge itself?  And maybe
> > more importantly, is there a way to achieve a similar behavior with a 3rd
> > party store?  I'm not sure if hooking into the uncaught exception handler
> > might be a good way to purge/drop a state store in the event of a fatal
> > error?  I did setup a MongoDB state store recently as part of a POC and
> was
> > testing it with EOS enabled.  (forcing crashes to occur and checking that
> > the result of my aggregation was still accurate)  I was unable to cause
> > inconsistent data in the mongo store (which is good!), though of course I
> > may just have been getting lucky.  Thanks again for your help,
> >
> > Alex
> >
> > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole 
> wrote:
> >
> >> Bruno,
> >>
> >> i tried to explain this in 'kafka user's language through above
> mentioned
> >> scenario, hope i put it properly -:) and correct me if i am wrong
> >>
> >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole 
> >> wrote:
> >>
> >>> This is what I understand could be the issue with external state store:
> >>>
> >>> kafka stream application consumes source topic, does processing, stores
> >>> state to kafka state store (this is backed by topic) and before
> producing
> >>> event on destination topic, the application fails with some issue. In
> >>> this case, the transaction has failed, so kafka guarantees either all
> or
> >>> none, means offset written to source topic, state written to state
> store
> >>> topic, output produced on destination topic... all of these happen or
> >> none
> >>> of these and in this failure scenario it is none of these.
> >>>
> >>> Assume you have redis state store, and you updated the state into redis
> >>> and stream application failed. Now, you have source topic and
> destination
> >>> topic consistent i.e. offset is not committed to source topic and
> output
> >>> not produced on destination topic, but you redis state store is
> >>> inconsistent with that since it is external state store and kafka can't
> >>> guarantee rollback ot state written there
> >>>
> >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig 
> >> wrote:
> >>>
>  " Another issue with 3rd party state stores could be violation of
>  exactly-once guarantee provided by kafka streams in the event of a
> >> failure
>  of streams application instance"
> 
>  I've heard this before but would love to know more about how a custom
>  state
> >>>

Kafka Streams And Partitioning

2021-03-15 Thread Gareth Collins
Hi,

This may be a newbie question but is it possible to control the
partitioning of a RocksDB KeyValueStore in Kafka Streams?

For example, I perhaps only want to partition based on a prefix of a key
rather than the full key. I assume something similar must be done for the
WindowStore to partition without the window start time and sequence number
(otherwise window entries could be spread across partitions)?

Sort of like the window store, I am wanting to be able to retrieve all
values with a certain key prefix from the KeyValueStore with one read
operation.

Is this possible?

thanks in advance,
Gareth Collins


Re: Rebalancing and scaling of consumers on kubernetes, instanteous scale to x consumer replicas ==> x rebalancing?

2021-03-15 Thread Sophie Blee-Goldman
Hey Mazen,

There's not necessarily one rebalance per new consumer, in theory if all
100 consumers are started up at the same time then there
may be just a single rebalance. It really depends on the timing -- for
example in the log snippet you provided, you can see that the
first member joined at 13:57:34 and the rebalance completed ~3 seconds
later at 13:57:37. Then the second member is seen joining
at 14:07:43, which is just over 10 minutes later. I think you need to
investigate why there's such a long delay between the first and
second consumers joining the group.


On Mon, Mar 15, 2021 at 7:58 AM Mazen Ezzeddine <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Hi all,
>
> I have a kafka consumer  pod running on kubernetes, I executed the command
> kubectl scale consumerName --replicas=2,  and as shown in the logs below
> two seperate rebalancing processes were trigerred, so if the number of
> consumer replicas scaled = 100, one hundred seperate rebalancing are going
> to be trigerred.  is that accurate? am I missing something? any workaroud
> to trigger a single rebalancing regardless of the number of replicas in the
> scale command.
>
>
> group coordinator logs
> =
>
>
> 2021-03-15 13:57:34,230 INFO [GroupCoordinator 1]: Preparing to rebalance
> group debugconsumerlag in state PreparingRebalance with old generation 0
> (__consumer_offsets-31) (reason: Adding new member
> consumer-debugconsumerlag-1-1a577d6c-7389-4217-883f-89535032ae02 with group
> instance id None) (kafka.coordinator.group.GroupCoordinator)
> [data-plane-kafka-request-handler-5]
> 2021-03-15 13:57:37,266 INFO [GroupCoordinator 1]: Stabilized group
> debugconsumerlag generation 1 (__consumer_offsets-31)
> (kafka.coordinator.group.GroupCoordinator) [executor-Rebalance]
> 2021-03-15 13:57:37,784 INFO [GroupCoordinator 1]: Assignment received
> from leader for group debugconsumerlag for generation 1
> (kafka.coordinator.group.GroupCoordinator)
> [data-plane-kafka-request-handler-3]
> 2021-03-15 14:07:43,822 INFO [GroupCoordinator 1]: Preparing to rebalance
> group debugconsumerlag in state PreparingRebalance with old generation 1
> (__consumer_offsets-31) (reason: Adding new member
> consumer-debugconsumerlag-1-e2e57bf6-6cbc-4dba-81d4-d7e58219c23f with group
> instance id None) (kafka.coordinator.group.GroupCoordinator)
> [data-plane-kafka-request-handler-1]
> 2021-03-15 14:07:46,530 INFO [GroupCoordinator 1]: Stabilized group
> debugconsumerlag generation 2 (__consumer_offsets-31)
> (kafka.coordinator.group.GroupCoordinator)
> [data-plane-kafka-request-handler-1]
> 2021-03-15 14:07:46,675 INFO [GroupCoordinator 1]: Assignment received
> from leader for group debugconsumerlag for generation 2
> (kafka.coordinator.group.GroupCoordinator)
> [data-plane-kafka-request-handler-3]
>
> Kind regards,
>


Re: [ANNOUNCE] New committer: Tom Bentley

2021-03-15 Thread Bill Bejeck
Congratulations, Tom!

-Bill

On Mon, Mar 15, 2021 at 2:08 PM Bruno Cadonna 
wrote:

> Congrats, Tom!
>
> Best,
> Bruno
>
> On 15.03.21 18:59, Mickael Maison wrote:
> > Hi all,
> >
> > The PMC for Apache Kafka has invited Tom Bentley as a committer, and
> > we are excited to announce that he accepted!
> >
> > Tom first contributed to Apache Kafka in June 2017 and has been
> > actively contributing since February 2020.
> > He has accumulated 52 commits and worked on a number of KIPs. Here are
> > some of the most significant ones:
> > KIP-183: Change PreferredReplicaLeaderElectionCommand to use
> AdminClient
> > KIP-195: AdminClient.createPartitions
> > KIP-585: Filter and Conditional SMTs
> > KIP-621: Deprecate and replace DescribeLogDirsResult.all() and
> .values()
> > KIP-707: The future of KafkaFuture (still in discussion)
> >
> > In addition, he is very active on the mailing list and has helped
> > review many KIPs.
> >
> > Congratulations Tom and thanks for all the contributions!
> >
>


Re: [ANNOUNCE] New committer: Tom Bentley

2021-03-15 Thread Bruno Cadonna

Congrats, Tom!

Best,
Bruno

On 15.03.21 18:59, Mickael Maison wrote:

Hi all,

The PMC for Apache Kafka has invited Tom Bentley as a committer, and
we are excited to announce that he accepted!

Tom first contributed to Apache Kafka in June 2017 and has been
actively contributing since February 2020.
He has accumulated 52 commits and worked on a number of KIPs. Here are
some of the most significant ones:
KIP-183: Change PreferredReplicaLeaderElectionCommand to use AdminClient
KIP-195: AdminClient.createPartitions
KIP-585: Filter and Conditional SMTs
KIP-621: Deprecate and replace DescribeLogDirsResult.all() and .values()
KIP-707: The future of KafkaFuture (still in discussion)

In addition, he is very active on the mailing list and has helped
review many KIPs.

Congratulations Tom and thanks for all the contributions!



Re: [ANNOUNCE] New committer: Tom Bentley

2021-03-15 Thread Robin Moffatt
Congratulations Tom!


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Mon, 15 Mar 2021 at 18:00, Mickael Maison  wrote:

> Hi all,
>
> The PMC for Apache Kafka has invited Tom Bentley as a committer, and
> we are excited to announce that he accepted!
>
> Tom first contributed to Apache Kafka in June 2017 and has been
> actively contributing since February 2020.
> He has accumulated 52 commits and worked on a number of KIPs. Here are
> some of the most significant ones:
>KIP-183: Change PreferredReplicaLeaderElectionCommand to use AdminClient
>KIP-195: AdminClient.createPartitions
>KIP-585: Filter and Conditional SMTs
>KIP-621: Deprecate and replace DescribeLogDirsResult.all() and .values()
>KIP-707: The future of KafkaFuture (still in discussion)
>
> In addition, he is very active on the mailing list and has helped
> review many KIPs.
>
> Congratulations Tom and thanks for all the contributions!
>


[ANNOUNCE] New committer: Tom Bentley

2021-03-15 Thread Mickael Maison
Hi all,

The PMC for Apache Kafka has invited Tom Bentley as a committer, and
we are excited to announce that he accepted!

Tom first contributed to Apache Kafka in June 2017 and has been
actively contributing since February 2020.
He has accumulated 52 commits and worked on a number of KIPs. Here are
some of the most significant ones:
   KIP-183: Change PreferredReplicaLeaderElectionCommand to use AdminClient
   KIP-195: AdminClient.createPartitions
   KIP-585: Filter and Conditional SMTs
   KIP-621: Deprecate and replace DescribeLogDirsResult.all() and .values()
   KIP-707: The future of KafkaFuture (still in discussion)

In addition, he is very active on the mailing list and has helped
review many KIPs.

Congratulations Tom and thanks for all the contributions!


Re: Error upgrading KafkaStreams

2021-03-15 Thread Murilo Tavares
Hi Bruno
Yes, cleaning up before upgrading is probably what I'm gonna do.
I was just trying to understand what's going on, as this shouldn't be
required.
Thanks for your help
Murilo


On Mon, 15 Mar 2021 at 11:16, Bruno Cadonna 
wrote:

> Hi Murilo,
>
> OK, now I see why you do not get an error in the second case in your
> small environment where you cleaned up before upgrading. You would
> restore from the earliest offset anyway and that is defined by the
> earliest offset at the broker and that always exists. Hence, no out of
> range exception is thrown.
>
> I am wondering why you get a out of range exception after upgrading
> without clean up, though.
>
> A solution would be to clean up before upgrading in your large
> environment. I do not know if this is a viable solution for you.
>
> Best,
> Bruno
>
> On 15.03.21 16:01, Murilo Tavares wrote:
> > Hi Bruno
> > We have an environment variable that, when set, will call
> > KafkaStreams.cleanup() and sleep.
> > The changelog topic is an internal KafkaStreams topic, for which I'm not
> > changing any policies.
> > It should be some default policy for a KTable in my understanding.
> > Thanks
> > Murilo
> >
> >
> >
> > On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna 
> > wrote:
> >
> >> Hi Murilo,
> >>
> >> A couple of questions:
> >>
> >> 1. What do you mean exactly with clean up?
> >> 2. Do you have acleanup policy specified on the changelog topics?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 15.03.21 15:03, Murilo Tavares wrote:
> >>> Hi Bruno
> >>> No, I haven't tested resetting the application before upgrading on my
> >> large
> >>> environment. But I was able to reproduce it in my dev environment,
> which
> >> is
> >>> way smaller.
> >>> This is what I did:
> >>> - Clean up and downgrade to 2.4.
> >>> - Let it catch up;
> >>> - upgrade to 2.7; Same errors, but it caught up after a while;
> >>>
> >>> Then I tried these steps:
> >>> - Clean up and downgrade to 2.4.
> >>> - Let it catch up;
> >>> - Clean up and upgrade to 2.7. No error this time.
> >>>
> >>> Thanks
> >>> Murilo
> >>>
> >>> On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna  >
> >>> wrote:
> >>>
>  Hi Murilo,
> 
>  Did you retry to upgrade again after you reset the application? Did it
>  work?
> 
>  Best,
>  Bruno
> 
>  On 15.03.21 14:26, Murilo Tavares wrote:
> > Hi Bruno
> > Thanks for your response.
> > No, I did not reset the application prior to upgrading. That was
> simply
> > upgrading KafkaStreams from 2.4 to 2.7.
> >
> > I was able to reproduce it on a smaller environment, and it does
> indeed
> > recover.
> > In a large environment, though, it keeps like that for hours. In this
>  same
> > large environment, I had to downgrade the application, and when doing
>  that
> > I did reset the application, which just took a few minutes.
> >
> > Thanks
> > Murilo
> >
> > On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna
>  >>>
> > wrote:
> >
> >> Hi Murilo,
> >>
> >> No, you do not need any special procedure to upgrade from 2.4 to
> 2.7.
> >>
> >> What you see in the logs is not an error but a warning. It should
> not
> >> block you on startup forever. The warning says that the local states
> >> of
> >> task 7_17 are corrupted because the offset you want to fetch of the
> >> state changelog topic partition
> >> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is
> >> larger
> >> or smaller than the offsets that exist on the brokers for that
> >> partition. If Streams runs into such an exception it will recreate
> the
> >> state from scratch which might take a while depending on the size of
> >> the
> >> state.
> >>
> >> The root cause of this warning is not clear from the information you
> >> gave. Did you maybe reset the application but not wipe out the local
> >> state stores?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 12.03.21 19:11, Murilo Tavares wrote:
> >>> Hi
> >>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams
> >> app
>  on
> >>> 2.4.0.
> >>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my
> >> instances
> >>> stuck on startup.
> >>> In my understanding, I don't need any special procedure to upgraded
>  from
> >>> KStreams 2.4.0 to 2.7.0, right?
> >>>
> >>> The following error stands out for me:
> >>>
> >>> 2021-03-12 16:23:52.005 [...] WARN
> >>>  org.apache.kafka.streams.processor.internals.StreamThread -
> >> stream-thread
> >>> [...] Detected the states of tasks
> >>>
> >> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
> >> are
> >>> corrupted. Will close the task as dirty and re-create and bootstrap
>  from
> >>> scratch.
> >>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
> >>> changelogs
> >>>
> >> {7_17=[my-as

Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

OK, now I see why you do not get an error in the second case in your 
small environment where you cleaned up before upgrading. You would 
restore from the earliest offset anyway and that is defined by the 
earliest offset at the broker and that always exists. Hence, no out of 
range exception is thrown.


I am wondering why you get a out of range exception after upgrading 
without clean up, though.


A solution would be to clean up before upgrading in your large 
environment. I do not know if this is a viable solution for you.


Best,
Bruno

On 15.03.21 16:01, Murilo Tavares wrote:

Hi Bruno
We have an environment variable that, when set, will call
KafkaStreams.cleanup() and sleep.
The changelog topic is an internal KafkaStreams topic, for which I'm not
changing any policies.
It should be some default policy for a KTable in my understanding.
Thanks
Murilo



On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna 
wrote:


Hi Murilo,

A couple of questions:

1. What do you mean exactly with clean up?
2. Do you have acleanup policy specified on the changelog topics?

Best,
Bruno

On 15.03.21 15:03, Murilo Tavares wrote:

Hi Bruno
No, I haven't tested resetting the application before upgrading on my

large

environment. But I was able to reproduce it in my dev environment, which

is

way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;

Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.

Thanks
Murilo

On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna 
wrote:


Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it
work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:

Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this

same

large environment, I had to downgrade the application, and when doing

that

I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 


wrote:


Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states

of

task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is

larger

or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of

the

state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams

app

on

2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my

instances

stuck on startup.
In my understanding, I don't need any special procedure to upgraded

from

KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
 org.apache.kafka.streams.processor.internals.StreamThread -

stream-thread

[...] Detected the states of tasks


{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted. Will close the task as dirty and re-create and bootstrap

from

scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs


{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted and hence needs to be re-initialized
at






org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)

~[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)

~[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)

~[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)

[app.jar:?]
Caused by:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException:

Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at






org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)

recover messages marked for deletion

2021-03-15 Thread Jokin Cuadrado
Hi, a newbie error but I can't find how to fix this,

I updated the retention.ms topic config with a value shorted than what I
wanted, and marked more messages than what I would for deletion.

The files are still on the topic directory, but I can't find how can I tell
to kafka to move the "first valid offset" backwards and recover those
messages.

Any suggestions?

Thanks in advance, Regards
Jokin


Re: Redis as state store

2021-03-15 Thread Bruno Cadonna

Hi Alex,

I guess wiping out the state directory is easier code-wise, faster, 
and/or at the time of development the developers did not design for 
remote state stores. But I do actually not know the exact reason.


Off the top of my head, I do not know how to solve this for remote state 
stores. Using the uncaught exception handler is not good, because a 
computing node could fail without giving the JVM the opportunity to 
throw an exception.


In your tests, try to increase the commit interval to a high value and 
see if you get inconsistencies. You should get an inconsistency if the 
state store maintains counts for keys and after the last commit before 
the failure, the Streams app puts an event with a new key K with value 1 
into the state store. After failover, Streams would put the same event 
with key K again into the state store. If the state store deleted all of 
its data, Streams would put again value 1, but if the state store did 
not delete all data, Streams would put value 2 which is wrong because it 
would count the same event twice.


Best,
Bruno


On 15.03.21 15:20, Alex Craig wrote:

Bruno,
Thanks for the info!  that makes sense.  Of course now I have more
questions.  :)  Do you know why this is being done outside of the state
store API?  I assume there are reasons why a "deleteAll()" type of function
wouldn't work, thereby allowing a state store to purge itself?  And maybe
more importantly, is there a way to achieve a similar behavior with a 3rd
party store?  I'm not sure if hooking into the uncaught exception handler
might be a good way to purge/drop a state store in the event of a fatal
error?  I did setup a MongoDB state store recently as part of a POC and was
testing it with EOS enabled.  (forcing crashes to occur and checking that
the result of my aggregation was still accurate)  I was unable to cause
inconsistent data in the mongo store (which is good!), though of course I
may just have been getting lucky.  Thanks again for your help,

Alex

On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole  wrote:


Bruno,

i tried to explain this in 'kafka user's language through above mentioned
scenario, hope i put it properly -:) and correct me if i am wrong

On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole 
wrote:


This is what I understand could be the issue with external state store:

kafka stream application consumes source topic, does processing, stores
state to kafka state store (this is backed by topic) and before producing
event on destination topic, the application fails with some issue. In
this case, the transaction has failed, so kafka guarantees either all or
none, means offset written to source topic, state written to state store
topic, output produced on destination topic... all of these happen or

none

of these and in this failure scenario it is none of these.

Assume you have redis state store, and you updated the state into redis
and stream application failed. Now, you have source topic and destination
topic consistent i.e. offset is not committed to source topic and output
not produced on destination topic, but you redis state store is
inconsistent with that since it is external state store and kafka can't
guarantee rollback ot state written there

On Mon, Mar 15, 2021 at 6:30 PM Alex Craig 

wrote:



" Another issue with 3rd party state stores could be violation of
exactly-once guarantee provided by kafka streams in the event of a

failure

of streams application instance"

I've heard this before but would love to know more about how a custom
state
store would be at any greater risk than RocksDB as far as exactly-once
guarantees are concerned.  They all implement the same interface, so as
long as you're correctly implementing get(), put(), delete(), flush(),
etc,
you should be fine right?  In other words, I don't think there is any
special "exactly once magic" that is baked into the RocksDB store

code.  I

could be wrong though so I'd love to hear people's thoughts, thanks,

Alex C

On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
wrote:


Thanks for the responses. In the worst case, I might have to keep both
rocksdb for local store and keep an external store like Redis.

-mohan


On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:

 Another issue with 3rd party state stores could be violation of
 exactly-once guarantee provided by kafka streams in the event of a
failure
 of streams application instance.
 Kafka provides exactly once guarantee for consumer -> process ->
produce
 through transactions and with the use of state store like redis,

this

 guarantee is weaker

 On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 


wrote:

 > Hello Mohan,
 >
 > I think what you had in mind works with Redis, since it is a

remote

state
 > store engine, it does not have the co-partitioning requirements

as

local
 > state stores.
 >
 > One thing you'd need to tune KS though is that with remote

stores,

the
 > processing latency may 

Re: Error upgrading KafkaStreams

2021-03-15 Thread Murilo Tavares
Hi Bruno
We have an environment variable that, when set, will call
KafkaStreams.cleanup() and sleep.
The changelog topic is an internal KafkaStreams topic, for which I'm not
changing any policies.
It should be some default policy for a KTable in my understanding.
Thanks
Murilo



On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna 
wrote:

> Hi Murilo,
>
> A couple of questions:
>
> 1. What do you mean exactly with clean up?
> 2. Do you have acleanup policy specified on the changelog topics?
>
> Best,
> Bruno
>
> On 15.03.21 15:03, Murilo Tavares wrote:
> > Hi Bruno
> > No, I haven't tested resetting the application before upgrading on my
> large
> > environment. But I was able to reproduce it in my dev environment, which
> is
> > way smaller.
> > This is what I did:
> > - Clean up and downgrade to 2.4.
> > - Let it catch up;
> > - upgrade to 2.7; Same errors, but it caught up after a while;
> >
> > Then I tried these steps:
> > - Clean up and downgrade to 2.4.
> > - Let it catch up;
> > - Clean up and upgrade to 2.7. No error this time.
> >
> > Thanks
> > Murilo
> >
> > On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna 
> > wrote:
> >
> >> Hi Murilo,
> >>
> >> Did you retry to upgrade again after you reset the application? Did it
> >> work?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 15.03.21 14:26, Murilo Tavares wrote:
> >>> Hi Bruno
> >>> Thanks for your response.
> >>> No, I did not reset the application prior to upgrading. That was simply
> >>> upgrading KafkaStreams from 2.4 to 2.7.
> >>>
> >>> I was able to reproduce it on a smaller environment, and it does indeed
> >>> recover.
> >>> In a large environment, though, it keeps like that for hours. In this
> >> same
> >>> large environment, I had to downgrade the application, and when doing
> >> that
> >>> I did reset the application, which just took a few minutes.
> >>>
> >>> Thanks
> >>> Murilo
> >>>
> >>> On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna  >
> >>> wrote:
> >>>
>  Hi Murilo,
> 
>  No, you do not need any special procedure to upgrade from 2.4 to 2.7.
> 
>  What you see in the logs is not an error but a warning. It should not
>  block you on startup forever. The warning says that the local states
> of
>  task 7_17 are corrupted because the offset you want to fetch of the
>  state changelog topic partition
>  my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is
> larger
>  or smaller than the offsets that exist on the brokers for that
>  partition. If Streams runs into such an exception it will recreate the
>  state from scratch which might take a while depending on the size of
> the
>  state.
> 
>  The root cause of this warning is not clear from the information you
>  gave. Did you maybe reset the application but not wipe out the local
>  state stores?
> 
>  Best,
>  Bruno
> 
>  On 12.03.21 19:11, Murilo Tavares wrote:
> > Hi
> > I have Kafka brokers running on version 2.4.1, with a KafkaStreams
> app
> >> on
> > 2.4.0.
> > I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my
> instances
> > stuck on startup.
> > In my understanding, I don't need any special procedure to upgraded
> >> from
> > KStreams 2.4.0 to 2.7.0, right?
> >
> > The following error stands out for me:
> >
> > 2021-03-12 16:23:52.005 [...] WARN
> > org.apache.kafka.streams.processor.internals.StreamThread -
>  stream-thread
> > [...] Detected the states of tasks
> >
> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
>  are
> > corrupted. Will close the task as dirty and re-create and bootstrap
> >> from
> > scratch.
> > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
> > changelogs
> >
> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
>  are
> > corrupted and hence needs to be re-initialized
> > at
> >
> 
> >>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
> > ~[app.jar:?]
> > at
> >
> 
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
> > ~[app.jar:?]
> > at
> >
> 
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
> > ~[app.jar:?]
> > at
> >
> 
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> > [app.jar:?]
> > at
> >
> 
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
> > [app.jar:?]
> > Caused by:
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
> > currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
> > euw1-az1)], epoch=27}} is out of range for partition
> > my-assem

Rebalancing and scaling of consumers on kubernetes, instanteous scale to x consumer replicas ==> x rebalancing?

2021-03-15 Thread Mazen Ezzeddine
Hi all,

I have a kafka consumer  pod running on kubernetes, I executed the command 
kubectl scale consumerName --replicas=2,  and as shown in the logs below two 
seperate rebalancing processes were trigerred, so if the number of  consumer 
replicas scaled = 100, one hundred seperate rebalancing are going to be 
trigerred.  is that accurate? am I missing something? any workaroud to trigger 
a single rebalancing regardless of the number of replicas in the scale command.


group coordinator logs
=


2021-03-15 13:57:34,230 INFO [GroupCoordinator 1]: Preparing to rebalance group 
debugconsumerlag in state PreparingRebalance with old generation 0 
(__consumer_offsets-31) (reason: Adding new member 
consumer-debugconsumerlag-1-1a577d6c-7389-4217-883f-89535032ae02 with group 
instance id None) (kafka.coordinator.group.GroupCoordinator) 
[data-plane-kafka-request-handler-5]
2021-03-15 13:57:37,266 INFO [GroupCoordinator 1]: Stabilized group 
debugconsumerlag generation 1 (__consumer_offsets-31) 
(kafka.coordinator.group.GroupCoordinator) [executor-Rebalance]
2021-03-15 13:57:37,784 INFO [GroupCoordinator 1]: Assignment received from 
leader for group debugconsumerlag for generation 1 
(kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-3]
2021-03-15 14:07:43,822 INFO [GroupCoordinator 1]: Preparing to rebalance group 
debugconsumerlag in state PreparingRebalance with old generation 1 
(__consumer_offsets-31) (reason: Adding new member 
consumer-debugconsumerlag-1-e2e57bf6-6cbc-4dba-81d4-d7e58219c23f with group 
instance id None) (kafka.coordinator.group.GroupCoordinator) 
[data-plane-kafka-request-handler-1]
2021-03-15 14:07:46,530 INFO [GroupCoordinator 1]: Stabilized group 
debugconsumerlag generation 2 (__consumer_offsets-31) 
(kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-1]
2021-03-15 14:07:46,675 INFO [GroupCoordinator 1]: Assignment received from 
leader for group debugconsumerlag for generation 2 
(kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-3]

Kind regards,


Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

A couple of questions:

1. What do you mean exactly with clean up?
2. Do you have acleanup policy specified on the changelog topics?

Best,
Bruno

On 15.03.21 15:03, Murilo Tavares wrote:

Hi Bruno
No, I haven't tested resetting the application before upgrading on my large
environment. But I was able to reproduce it in my dev environment, which is
way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;

Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.

Thanks
Murilo

On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna 
wrote:


Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it
work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:

Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this

same

large environment, I had to downgrade the application, and when doing

that

I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 
wrote:


Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states of
task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger
or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of the
state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app

on

2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded

from

KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
org.apache.kafka.streams.processor.internals.StreamThread -

stream-thread

[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted. Will close the task as dirty and re-create and bootstrap

from

scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted and hence needs to be re-initialized
at




org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)

[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at




org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)

~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo











Re: Redis as state store

2021-03-15 Thread Alex Craig
Bruno,
Thanks for the info!  that makes sense.  Of course now I have more
questions.  :)  Do you know why this is being done outside of the state
store API?  I assume there are reasons why a "deleteAll()" type of function
wouldn't work, thereby allowing a state store to purge itself?  And maybe
more importantly, is there a way to achieve a similar behavior with a 3rd
party store?  I'm not sure if hooking into the uncaught exception handler
might be a good way to purge/drop a state store in the event of a fatal
error?  I did setup a MongoDB state store recently as part of a POC and was
testing it with EOS enabled.  (forcing crashes to occur and checking that
the result of my aggregation was still accurate)  I was unable to cause
inconsistent data in the mongo store (which is good!), though of course I
may just have been getting lucky.  Thanks again for your help,

Alex

On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole  wrote:

> Bruno,
>
> i tried to explain this in 'kafka user's language through above mentioned
> scenario, hope i put it properly -:) and correct me if i am wrong
>
> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole 
> wrote:
>
> > This is what I understand could be the issue with external state store:
> >
> > kafka stream application consumes source topic, does processing, stores
> > state to kafka state store (this is backed by topic) and before producing
> > event on destination topic, the application fails with some issue. In
> > this case, the transaction has failed, so kafka guarantees either all or
> > none, means offset written to source topic, state written to state store
> > topic, output produced on destination topic... all of these happen or
> none
> > of these and in this failure scenario it is none of these.
> >
> > Assume you have redis state store, and you updated the state into redis
> > and stream application failed. Now, you have source topic and destination
> > topic consistent i.e. offset is not committed to source topic and output
> > not produced on destination topic, but you redis state store is
> > inconsistent with that since it is external state store and kafka can't
> > guarantee rollback ot state written there
> >
> > On Mon, Mar 15, 2021 at 6:30 PM Alex Craig 
> wrote:
> >
> >> " Another issue with 3rd party state stores could be violation of
> >> exactly-once guarantee provided by kafka streams in the event of a
> failure
> >> of streams application instance"
> >>
> >> I've heard this before but would love to know more about how a custom
> >> state
> >> store would be at any greater risk than RocksDB as far as exactly-once
> >> guarantees are concerned.  They all implement the same interface, so as
> >> long as you're correctly implementing get(), put(), delete(), flush(),
> >> etc,
> >> you should be fine right?  In other words, I don't think there is any
> >> special "exactly once magic" that is baked into the RocksDB store
> code.  I
> >> could be wrong though so I'd love to hear people's thoughts, thanks,
> >>
> >> Alex C
> >>
> >> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
> >> wrote:
> >>
> >> > Thanks for the responses. In the worst case, I might have to keep both
> >> > rocksdb for local store and keep an external store like Redis.
> >> >
> >> > -mohan
> >> >
> >> >
> >> > On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:
> >> >
> >> > Another issue with 3rd party state stores could be violation of
> >> > exactly-once guarantee provided by kafka streams in the event of a
> >> > failure
> >> > of streams application instance.
> >> > Kafka provides exactly once guarantee for consumer -> process ->
> >> > produce
> >> > through transactions and with the use of state store like redis,
> >> this
> >> > guarantee is weaker
> >> >
> >> > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang  >
> >> > wrote:
> >> >
> >> > > Hello Mohan,
> >> > >
> >> > > I think what you had in mind works with Redis, since it is a
> >> remote
> >> > state
> >> > > store engine, it does not have the co-partitioning requirements
> as
> >> > local
> >> > > state stores.
> >> > >
> >> > > One thing you'd need to tune KS though is that with remote
> stores,
> >> > the
> >> > > processing latency may be larger, and since Kafka Streams
> process
> >> all
> >> > > records of a single partition in order, synchronously, you may
> >> need
> >> > to tune
> >> > > the poll interval configs etc to make sure KS would stay in the
> >> > consumer
> >> > > group and not trigger unnecessary rebalances.
> >> > >
> >> > > Guozhang
> >> > >
> >> > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> >> > mpart...@hpe.com>
> >> > > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I have a use case where messages come in with some key gets
> >> > assigned some
> >> > > > partition and the state gets created. Later, key changes (but
> >> still
> >> > > > contains the old key in the mess

Re: Error upgrading KafkaStreams

2021-03-15 Thread Murilo Tavares
Hi Bruno
No, I haven't tested resetting the application before upgrading on my large
environment. But I was able to reproduce it in my dev environment, which is
way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;

Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.

Thanks
Murilo

On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna 
wrote:

> Hi Murilo,
>
> Did you retry to upgrade again after you reset the application? Did it
> work?
>
> Best,
> Bruno
>
> On 15.03.21 14:26, Murilo Tavares wrote:
> > Hi Bruno
> > Thanks for your response.
> > No, I did not reset the application prior to upgrading. That was simply
> > upgrading KafkaStreams from 2.4 to 2.7.
> >
> > I was able to reproduce it on a smaller environment, and it does indeed
> > recover.
> > In a large environment, though, it keeps like that for hours. In this
> same
> > large environment, I had to downgrade the application, and when doing
> that
> > I did reset the application, which just took a few minutes.
> >
> > Thanks
> > Murilo
> >
> > On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 
> > wrote:
> >
> >> Hi Murilo,
> >>
> >> No, you do not need any special procedure to upgrade from 2.4 to 2.7.
> >>
> >> What you see in the logs is not an error but a warning. It should not
> >> block you on startup forever. The warning says that the local states of
> >> task 7_17 are corrupted because the offset you want to fetch of the
> >> state changelog topic partition
> >> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger
> >> or smaller than the offsets that exist on the brokers for that
> >> partition. If Streams runs into such an exception it will recreate the
> >> state from scratch which might take a while depending on the size of the
> >> state.
> >>
> >> The root cause of this warning is not clear from the information you
> >> gave. Did you maybe reset the application but not wipe out the local
> >> state stores?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 12.03.21 19:11, Murilo Tavares wrote:
> >>> Hi
> >>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams app
> on
> >>> 2.4.0.
> >>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
> >>> stuck on startup.
> >>> In my understanding, I don't need any special procedure to upgraded
> from
> >>> KStreams 2.4.0 to 2.7.0, right?
> >>>
> >>> The following error stands out for me:
> >>>
> >>> 2021-03-12 16:23:52.005 [...] WARN
> >>>org.apache.kafka.streams.processor.internals.StreamThread -
> >> stream-thread
> >>> [...] Detected the states of tasks
> >>> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
> >> are
> >>> corrupted. Will close the task as dirty and re-create and bootstrap
> from
> >>> scratch.
> >>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
> >>> changelogs
> >>> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
> >> are
> >>> corrupted and hence needs to be re-initialized
> >>> at
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> >>> [app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
> >>> [app.jar:?]
> >>> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> >>> Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
> >>> currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
> >>> euw1-az1)], epoch=27}} is out of range for partition
> >>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
> >>> at
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> >>> ~[app.jar:?]
> >>> at
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StoreC

Re: Redis as state store

2021-03-15 Thread Pushkar Deole
Bruno,

i tried to explain this in 'kafka user's language through above mentioned
scenario, hope i put it properly -:) and correct me if i am wrong

On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole  wrote:

> This is what I understand could be the issue with external state store:
>
> kafka stream application consumes source topic, does processing, stores
> state to kafka state store (this is backed by topic) and before producing
> event on destination topic, the application fails with some issue. In
> this case, the transaction has failed, so kafka guarantees either all or
> none, means offset written to source topic, state written to state store
> topic, output produced on destination topic... all of these happen or none
> of these and in this failure scenario it is none of these.
>
> Assume you have redis state store, and you updated the state into redis
> and stream application failed. Now, you have source topic and destination
> topic consistent i.e. offset is not committed to source topic and output
> not produced on destination topic, but you redis state store is
> inconsistent with that since it is external state store and kafka can't
> guarantee rollback ot state written there
>
> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig  wrote:
>
>> " Another issue with 3rd party state stores could be violation of
>> exactly-once guarantee provided by kafka streams in the event of a failure
>> of streams application instance"
>>
>> I've heard this before but would love to know more about how a custom
>> state
>> store would be at any greater risk than RocksDB as far as exactly-once
>> guarantees are concerned.  They all implement the same interface, so as
>> long as you're correctly implementing get(), put(), delete(), flush(),
>> etc,
>> you should be fine right?  In other words, I don't think there is any
>> special "exactly once magic" that is baked into the RocksDB store code.  I
>> could be wrong though so I'd love to hear people's thoughts, thanks,
>>
>> Alex C
>>
>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
>> wrote:
>>
>> > Thanks for the responses. In the worst case, I might have to keep both
>> > rocksdb for local store and keep an external store like Redis.
>> >
>> > -mohan
>> >
>> >
>> > On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:
>> >
>> > Another issue with 3rd party state stores could be violation of
>> > exactly-once guarantee provided by kafka streams in the event of a
>> > failure
>> > of streams application instance.
>> > Kafka provides exactly once guarantee for consumer -> process ->
>> > produce
>> > through transactions and with the use of state store like redis,
>> this
>> > guarantee is weaker
>> >
>> > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 
>> > wrote:
>> >
>> > > Hello Mohan,
>> > >
>> > > I think what you had in mind works with Redis, since it is a
>> remote
>> > state
>> > > store engine, it does not have the co-partitioning requirements as
>> > local
>> > > state stores.
>> > >
>> > > One thing you'd need to tune KS though is that with remote stores,
>> > the
>> > > processing latency may be larger, and since Kafka Streams process
>> all
>> > > records of a single partition in order, synchronously, you may
>> need
>> > to tune
>> > > the poll interval configs etc to make sure KS would stay in the
>> > consumer
>> > > group and not trigger unnecessary rebalances.
>> > >
>> > > Guozhang
>> > >
>> > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
>> > mpart...@hpe.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I have a use case where messages come in with some key gets
>> > assigned some
>> > > > partition and the state gets created. Later, key changes (but
>> still
>> > > > contains the old key in the message) and gets sent to a
>> different
>> > > > partition. I want to be able to grab the old state using the old
>> > key
>> > > before
>> > > > creating the new state on this instance. Redis as a  state store
>> > makes it
>> > > > easy to implement this where I can simply do a lookup before
>> > creating the
>> > > > state. I see an implementation here :
>> > > >
>> > >
>> >
>> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
>> > > >
>> > > > Has anyone tried this ? Any caveats.
>> > > >
>> > > > Thanks
>> > > > Mohan
>> > > >
>> > > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>>
>


Re: Redis as state store

2021-03-15 Thread Pushkar Deole
This is what I understand could be the issue with external state store:

kafka stream application consumes source topic, does processing, stores
state to kafka state store (this is backed by topic) and before producing
event on destination topic, the application fails with some issue. In
this case, the transaction has failed, so kafka guarantees either all or
none, means offset written to source topic, state written to state store
topic, output produced on destination topic... all of these happen or none
of these and in this failure scenario it is none of these.

Assume you have redis state store, and you updated the state into redis and
stream application failed. Now, you have source topic and destination topic
consistent i.e. offset is not committed to source topic and output not
produced on destination topic, but you redis state store is inconsistent
with that since it is external state store and kafka can't guarantee
rollback ot state written there

On Mon, Mar 15, 2021 at 6:30 PM Alex Craig  wrote:

> " Another issue with 3rd party state stores could be violation of
> exactly-once guarantee provided by kafka streams in the event of a failure
> of streams application instance"
>
> I've heard this before but would love to know more about how a custom state
> store would be at any greater risk than RocksDB as far as exactly-once
> guarantees are concerned.  They all implement the same interface, so as
> long as you're correctly implementing get(), put(), delete(), flush(), etc,
> you should be fine right?  In other words, I don't think there is any
> special "exactly once magic" that is baked into the RocksDB store code.  I
> could be wrong though so I'd love to hear people's thoughts, thanks,
>
> Alex C
>
> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
> wrote:
>
> > Thanks for the responses. In the worst case, I might have to keep both
> > rocksdb for local store and keep an external store like Redis.
> >
> > -mohan
> >
> >
> > On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:
> >
> > Another issue with 3rd party state stores could be violation of
> > exactly-once guarantee provided by kafka streams in the event of a
> > failure
> > of streams application instance.
> > Kafka provides exactly once guarantee for consumer -> process ->
> > produce
> > through transactions and with the use of state store like redis, this
> > guarantee is weaker
> >
> > On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 
> > wrote:
> >
> > > Hello Mohan,
> > >
> > > I think what you had in mind works with Redis, since it is a remote
> > state
> > > store engine, it does not have the co-partitioning requirements as
> > local
> > > state stores.
> > >
> > > One thing you'd need to tune KS though is that with remote stores,
> > the
> > > processing latency may be larger, and since Kafka Streams process
> all
> > > records of a single partition in order, synchronously, you may need
> > to tune
> > > the poll interval configs etc to make sure KS would stay in the
> > consumer
> > > group and not trigger unnecessary rebalances.
> > >
> > > Guozhang
> > >
> > > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> > mpart...@hpe.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a use case where messages come in with some key gets
> > assigned some
> > > > partition and the state gets created. Later, key changes (but
> still
> > > > contains the old key in the message) and gets sent to a different
> > > > partition. I want to be able to grab the old state using the old
> > key
> > > before
> > > > creating the new state on this instance. Redis as a  state store
> > makes it
> > > > easy to implement this where I can simply do a lookup before
> > creating the
> > > > state. I see an implementation here :
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > >
> > > > Has anyone tried this ? Any caveats.
> > > >
> > > > Thanks
> > > > Mohan
> > > >
> > > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
>


Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:

Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this same
large environment, I had to downgrade the application, and when doing that
I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 
wrote:


Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states of
task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger
or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of the
state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded from
KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
   org.apache.kafka.streams.processor.internals.StreamThread -

stream-thread

[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted. Will close the task as dirty and re-create and bootstrap from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted and hence needs to be re-initialized
at


org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)

[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at


org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)

~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo







Re: Redis as state store

2021-03-15 Thread Bruno Cadonna

Hi Alex,

You are right! There is no "exactly once magic" backed into the RocksDB 
store code. The point is local vs remote. When a Kafka Streams client 
closes dirty under EOS, the state (i.e., the content of the state store) 
needs to be wiped out and to be re-created from scratch from the 
changelog topic on the brokers. To wipe out the state the state 
directory is deleted.


For a remote state store, the wiping out of the state directory would 
not delete the contents of the remote state store before Kafka Streams 
re-creates the content from scratch.


Wiping out the state directory, happens outside of the API implemented 
by a state store.


Best,
Bruno

On 15.03.21 13:59, Alex Craig wrote:

" Another issue with 3rd party state stores could be violation of
exactly-once guarantee provided by kafka streams in the event of a failure
of streams application instance"

I've heard this before but would love to know more about how a custom state
store would be at any greater risk than RocksDB as far as exactly-once
guarantees are concerned.  They all implement the same interface, so as
long as you're correctly implementing get(), put(), delete(), flush(), etc,
you should be fine right?  In other words, I don't think there is any
special "exactly once magic" that is baked into the RocksDB store code.  I
could be wrong though so I'd love to hear people's thoughts, thanks,

Alex C

On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
wrote:


Thanks for the responses. In the worst case, I might have to keep both
rocksdb for local store and keep an external store like Redis.

-mohan


On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:

 Another issue with 3rd party state stores could be violation of
 exactly-once guarantee provided by kafka streams in the event of a
failure
 of streams application instance.
 Kafka provides exactly once guarantee for consumer -> process ->
produce
 through transactions and with the use of state store like redis, this
 guarantee is weaker

 On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 
wrote:

 > Hello Mohan,
 >
 > I think what you had in mind works with Redis, since it is a remote
state
 > store engine, it does not have the co-partitioning requirements as
local
 > state stores.
 >
 > One thing you'd need to tune KS though is that with remote stores,
the
 > processing latency may be larger, and since Kafka Streams process all
 > records of a single partition in order, synchronously, you may need
to tune
 > the poll interval configs etc to make sure KS would stay in the
consumer
 > group and not trigger unnecessary rebalances.
 >
 > Guozhang
 >
 > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
mpart...@hpe.com>
 > wrote:
 >
 > > Hi,
 > >
 > > I have a use case where messages come in with some key gets
assigned some
 > > partition and the state gets created. Later, key changes (but still
 > > contains the old key in the message) and gets sent to a different
 > > partition. I want to be able to grab the old state using the old
key
 > before
 > > creating the new state on this instance. Redis as a  state store
makes it
 > > easy to implement this where I can simply do a lookup before
creating the
 > > state. I see an implementation here :
 > >
 >
https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
 > >
 > > Has anyone tried this ? Any caveats.
 > >
 > > Thanks
 > > Mohan
 > >
 > >
 >
 > --
 > -- Guozhang
 >







Re: Error upgrading KafkaStreams

2021-03-15 Thread Murilo Tavares
Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this same
large environment, I had to downgrade the application, and when doing that
I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 
wrote:

> Hi Murilo,
>
> No, you do not need any special procedure to upgrade from 2.4 to 2.7.
>
> What you see in the logs is not an error but a warning. It should not
> block you on startup forever. The warning says that the local states of
> task 7_17 are corrupted because the offset you want to fetch of the
> state changelog topic partition
> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger
> or smaller than the offsets that exist on the brokers for that
> partition. If Streams runs into such an exception it will recreate the
> state from scratch which might take a while depending on the size of the
> state.
>
> The root cause of this warning is not clear from the information you
> gave. Did you maybe reset the application but not wipe out the local
> state stores?
>
> Best,
> Bruno
>
> On 12.03.21 19:11, Murilo Tavares wrote:
> > Hi
> > I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on
> > 2.4.0.
> > I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
> > stuck on startup.
> > In my understanding, I don't need any special procedure to upgraded from
> > KStreams 2.4.0 to 2.7.0, right?
> >
> > The following error stands out for me:
> >
> > 2021-03-12 16:23:52.005 [...] WARN
> >   org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [...] Detected the states of tasks
> > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
> are
> > corrupted. Will close the task as dirty and re-create and bootstrap from
> > scratch.
> > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
> > changelogs
> > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
> are
> > corrupted and hence needs to be re-initialized
> > at
> >
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> > [app.jar:?]
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
> > [app.jar:?]
> > Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
> > currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
> > euw1-az1)], epoch=27}} is out of range for partition
> > my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
> > at
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> > ~[app.jar:?]
> > at
> >
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
> > ~[app.jar:?]
> > ... 4 more
> >
> > Any suggestions on how to upgrade?
> > Thanks
> > Murilo
> >
>


Re: Redis as state store

2021-03-15 Thread Alex Craig
" Another issue with 3rd party state stores could be violation of
exactly-once guarantee provided by kafka streams in the event of a failure
of streams application instance"

I've heard this before but would love to know more about how a custom state
store would be at any greater risk than RocksDB as far as exactly-once
guarantees are concerned.  They all implement the same interface, so as
long as you're correctly implementing get(), put(), delete(), flush(), etc,
you should be fine right?  In other words, I don't think there is any
special "exactly once magic" that is baked into the RocksDB store code.  I
could be wrong though so I'd love to hear people's thoughts, thanks,

Alex C

On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
wrote:

> Thanks for the responses. In the worst case, I might have to keep both
> rocksdb for local store and keep an external store like Redis.
>
> -mohan
>
>
> On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:
>
> Another issue with 3rd party state stores could be violation of
> exactly-once guarantee provided by kafka streams in the event of a
> failure
> of streams application instance.
> Kafka provides exactly once guarantee for consumer -> process ->
> produce
> through transactions and with the use of state store like redis, this
> guarantee is weaker
>
> On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 
> wrote:
>
> > Hello Mohan,
> >
> > I think what you had in mind works with Redis, since it is a remote
> state
> > store engine, it does not have the co-partitioning requirements as
> local
> > state stores.
> >
> > One thing you'd need to tune KS though is that with remote stores,
> the
> > processing latency may be larger, and since Kafka Streams process all
> > records of a single partition in order, synchronously, you may need
> to tune
> > the poll interval configs etc to make sure KS would stay in the
> consumer
> > group and not trigger unnecessary rebalances.
> >
> > Guozhang
> >
> > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> mpart...@hpe.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have a use case where messages come in with some key gets
> assigned some
> > > partition and the state gets created. Later, key changes (but still
> > > contains the old key in the message) and gets sent to a different
> > > partition. I want to be able to grab the old state using the old
> key
> > before
> > > creating the new state on this instance. Redis as a  state store
> makes it
> > > easy to implement this where I can simply do a lookup before
> creating the
> > > state. I see an implementation here :
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > >
> > > Has anyone tried this ? Any caveats.
> > >
> > > Thanks
> > > Mohan
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>
>
>


Schema registry for Kafka topic

2021-03-15 Thread Mich Talebzadeh
Hi,


We have an in-house cluster of Kafka brokers and ZooKeepers.


Kafka version kafka_2.12-2.7.0

ZooKeeper version apache-zookeeper-3.6.2-bin


The topic is published to Google BigQuery.


We would like to use a schema registry as opposed to sending schema with
payload for each message that adds to the volume of traffic.


As I understand from the available literature, this schema registry acts as
a a kind of API to decouple consumers from producers and provide a common
interface for topic metadata.


Currently I am looking at the open source community version for creating
and maintaining this schema registry. We tried wepay provider from
Confluent (which I assumed it seamlessly allow one to publish kafka topic
(source Json) to Google BigQuery but still getting the error:


org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
due to unrecoverable exception.

at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)

at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)

at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)

at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)

at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by:
com.wepay.kafka.connect.bigquery.exception.ConversionConnectException:
Top-level Kafka Connect schema must be of type 'struct'


Appreciate any advice.


Thanks



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not 
block you on startup forever. The warning says that the local states of 
task 7_17 are corrupted because the offset you want to fetch of the 
state changelog topic partition 
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger 
or smaller than the offsets that exist on the brokers for that 
partition. If Streams runs into such an exception it will recreate the 
state from scratch which might take a while depending on the size of the 
state.


The root cause of this warning is not clear from the information you 
gave. Did you maybe reset the application but not wipe out the local 
state stores?


Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded from
KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
  org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are
corrupted. Will close the task as dirty and re-create and bootstrap from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are
corrupted and hence needs to be re-initialized
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo



Re: topics Replicas not equals Isr in zookeeper

2021-03-15 Thread wenbing shen
This issue has clear positioning, increase the partition, the colleague's
code used in 0.10 AdminUtils, the replicas of the partitions of the
existing distribution redistribution, and directly write into the znode, to
the normal controller failed to deal with this logic, I run the kafka is
2.0.0 version, don't know whether the latest kafka versions will deal with
this exception, or is it a optimization problem?

wenbing shen  于2021年3月11日周四 下午11:07写道:

> [image: image.png]
> This is a screenshot of a problem occurred in our product at the customer
> site. Does anyone know the reason?
>
> wenbing shen  于2021年3月11日周四 下午7:17写道:
>
>> A large number of Replicas of the topic are inconsistent with isr, an
>> example of which is as follows:
>> Topic: td_tip Partition: 0Leader: 1001Replicas:
>> 1003,1005 Isr: 1001,1002
>> Topic: td_tip Partition: 1Leader: 1006Replicas:
>> 1004,1007 Isr: 1006,1009
>> Topic: td_tip Partition: 2Leader: 1011Replicas: 1020,1022
>> Isr: 1012,1011
>> Topic: td_tip Partition: 3Leader: 1009Replicas:
>> 1015,1016 Isr: 1009,1008
>>
>