Re: MirrorMaker 2.0 and Streams interplay (topic naming control)

2019-03-25 Thread Paul Whalen
John and Ryanne,

Thanks for the responses! I think Ryanne's way of describing the question
is actually a much better summary than my long winded description: "a
Streams app can switch between topics with and without a cluster alias
prefix when you migrate between prod and pre-prod, while preserving state."

To address a few of John's points...

But, the prod app will still be running, and its changelog will still be
> mirrored into pre-prod when you start the pre-prod app.
>
The idea is actually to turn off the mirroring from prod to pre-prod during
this period, so the environments can operate completely independently and
their state can comfortably diverge during the testing period.  After the
testing period we'd be happy to throw away everything in pre-prod and start
mirroring again from prod with a blank slate.

Also, the pre-prod app won't be in the same consumer group as the prod app,
> so it won't know from what offset to start processing input.
>
This is where I'm hoping the magic of MM2 will come in - at the time we
shut off mirroring from prod to pre-prod in order to spin of the pre-prod
environment, we will do an "offset translation" with RemoteClusterUtils
like Ryanne mentioned, so new Streams apps in pre-prod will see consumer
offsets that make sense for reading from pre-prod topics.

I like both of your ideas around the "user space" solution: subscribing to
multiple topics, or choosing a topic based on config.  However, in order to
populate their internal state properly, when the pre-prod apps come up they
will need to look for repartition and changelog topics with the right
prefix.  This seems problematic to me since the user doesn't have direct
control over those topic names, though it did just occur to me now that the
user *sort of* does.  Since the naming scheme is currently just
applicationId + "-" + storeName + "-changelog", we could translate the
consumer group offsets to a consumer group with a new name that has the
same prefix as the mirrored topics do.  That seems a bit clumsly/lucky to
me (is the internal topic naming convention really a "public API"?), but I
think it would work.

I'd be curious to hear if folks think that solution would work and be an
acceptable pattern, since my original proposal of more user control of
internal topic naming did seem a bit heavy handed.

Thanks very much for your help!
Paul

On Mon, Mar 25, 2019 at 3:14 PM Ryanne Dolan  wrote:

> Hey Paul, thanks for the kind words re MM2.
>
> I'm not a Streams expert first off, but I think I understand your question:
> if a Streams app can switch between topics with and without a cluster alias
> prefix when you migrate between prod and pre-prod, while preserving state.
> Streams supports regexes and lists of topics as input, so you can use e.g.
> builder.stream(List.of("topic1", "prod.topic1")), which is a good place to
> start. In this case, the combined subscription is still a single stream,
> conceptually, but comprises partitions from both topics, i.e. partitions
> from topic1 plus partitions from prod.topic1. At a high level, this is no
> different than adding more partitions to a single topic. I think any
> intermediate or downstream topics/tables would remain unchanged, since they
> are still the result of this single stream.
>
> The trick is to correctly translate offsets for the input topics when
> migrating the app between prod and pre-prod, which RemoteClusterUtils can
> help with. You could do this with external tooling, e.g. a script
> leveraging RemoteClusterUtils and kafka-streams-application-reset.sh. I
> haven't tried this with a Streams app myself, but I suspect it would work.
>
> Ryanne
>
>
> On Sun, Mar 24, 2019 at 12:31 PM Paul Whalen  wrote:
>
> > Hi all,
> >
> > With MirrorMaker 2.0 (
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
> > )
> > accepted and coming along very nicely in development, it has got me
> > wondering if a certain use case is supported, and if not, can changes be
> > made to Streams or MM2 to support it.  I'll explain the use case, but the
> > TL;DR here is "do we need more control over topic naming in MM2 or
> > Streams?"
> >
> > My team foresees using MM2 as a way to mirror data from our prod
> > environment to a pre-prod environment.  The data is supplied by external
> > vendors, introduced into our system through a Kafka Streams ETL pipeline,
> > and consumed by our end-applications.  Generally we would only like to
> run
> > the ETL pipeline in prod since there is an operational cost to running it
> > in both prod and pre-prod (the data sometimes needs manual attention).
> > This seems to fit MM2 well: pre-prod end-applications consume from the
> > pre-prod Kafka cluster, which is entirely "remote" topics being mirrored
> > from the prod cluster.  We only have to keep one instance of the ETL
> > pipeline running, but end-applications can be separate, connecting to
> their
> > respective prod and pre-prod Kafka clusters.
> >

Re: Please add me to the contributor list

2019-03-25 Thread Matthias J. Sax
Done.

On 3/25/19 3:08 PM, am wrote:
> My JIRA userid is JBFletcher
> 
> Thank you,
> Anna
> 



signature.asc
Description: OpenPGP digital signature


Please add me to the contributor list

2019-03-25 Thread am
My JIRA userid is JBFletcher

Thank you,
Anna


Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2019-03-25 Thread Ismael Juma
Hi George,

The goal is not to prevent people from updating ZK directly. The goal is to
offer a solution where people don't have to. If people then decide to avoid
the recommended path, they can deal with the consequences. However, if we
add another structure in ZK and no RPC mechanism, then there is no
recommended path apart from updating ZK (implicitly making it an API for
users).

Ismael

On Mon, Mar 25, 2019 at 3:57 PM George Li 
wrote:

>  Thanks Ismael.  One question, even switch to submitting reassignments via
> RPC instead of Zookeeper.  The reassignment data will still persist in
> ZooKeeper node /admin/reassign_partitions (e.g. when Controller failover it
> can resume reassignments)?  If yes, how this can keep someone from
> modifying ZK (/admin/reassign_partitions) directly ?
>
>
> Thanks,
> George
>
> On Saturday, March 23, 2019, 1:07:11 PM PDT, Ismael Juma <
> isma...@gmail.com> wrote:
>
>  Thanks for the KIP, making reassignment more flexible is definitely
> welcome. As others have mentioned, I think we need to do it via the Kafka
> protocol and not via ZK. The latter introduces an implicit API that other
> tools will depend on causing migration challenges. This has already
> happened with the existing ZK based interface and we should avoid
> introducing more tech debt here.
>
> Ismael
>
> On Sat, Mar 23, 2019, 12:09 PM Colin McCabe  wrote:
>
> > On Thu, Mar 21, 2019, at 20:51, George Li wrote:
> > >  Hi Colin,
> > >
> > > I agree with your proposal of having administrative APIs through RPC
> > > instead of ZooKeeper. But seems like it will incur significant changes
> > > to both submitting reassignments and this KIP's cancelling pending
> > > reassignments.
> > >
> > > To make this KIP simple and moving along, I will be happy to do another
> > > follow-up KIP to change all reassignment related operations via RP
> >
> > Thanks, George.  I think doing it as a two-step process is fine, but I
> > suspect it would be much easier and quicker to do the RPC conversion
> first,
> > and the interruptible part later.  The reason is because a lot of the
> > things that people have brought up as concerns with this KIP are really
> > issues with the API (how will people interact with ZK, how does access
> > control work, what does the format look like in ZK) that will just go
> away
> > once we have an RPC.
> >
> > > Just curious,  KIP-4 includes Topics/ACL related operations. In
> > > addition to Reassignments,  any other operations should be done via
> > > RPC?
> >
> > I think all of the administrative shell scripts have been converted
> except
> > kafka-configs.sh.  I believe there is a KIP for that conversion.
> > Reassigning partitions is probably the biggest KIP-4 gap we have right
> now.
> >
> > best,
> > Colin
> >
> > >
> > > Thanks,
> > > George
> > >
> > >
> > >On Wednesday, March 20, 2019, 5:28:59 PM PDT, Colin McCabe
> > >  wrote:
> > >
> > >  Hi George,
> > >
> > > One big problem here is that administrative APIs should be done through
> > > RPCs, not through ZooKeeper.  KIP-4 (Command line and centralized
> > > administrative operations) describes the rationale for this.  We want
> > > public and stable APIs that don't depend on the internal representation
> > > of stuff in ZK, which will change over time.  Tools shouldn't have to
> > > integrate with ZK or understand the internal data structures of Kafka
> > > to make administrative changes.  ZK doesn't have a good security,
> > > access control, or compatibility story.
> > >
> > > We should create an official reassignment RPC for Kafka.  This will
> > > solve many of the problems discussed in this thread, I think.  For
> > > example, with an RPC, users won't be able to make changes unless they
> > > have ALTER on KafkaCluster.  That avoids the problem of random users
> > > making changes without the administrator knowing.  Also, if multiple
> > > users are making changes, there is no risk that they will overwrite
> > > each other's changes, since they won't be modifying the internal ZK
> > > structures directly.
> > >
> > > I think a good reassignment API would be something like:
> > >
> > > > ReassignPartitionsResults reassignPartitions(Map > PartitionAssignment> reassignments);
> > > >
> > > > class PartitionAssignment {
> > > >  List nodes;
> > > > }
> > > >
> > > > class ReassignPartitionsResults {
> > > >  Map> pending;
> > > >  Map> completed;
> > > >  Map> rejected;
> > > > }
> > > >
> > > > PendingReassignmentResults pendingReassignments();
> > > >
> > > > class PendingReassignmentResults {
> > > >  KafkaFuture> pending;
> > > >  KafkaFuture> previous;
> > > > }
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Mar 19, 2019, at 15:04, George Li wrote:
> > > >  Hi Viktor,
> > > >
> > > > Thanks for the review.
> > > >
> > > > If there is reassignment in-progress while the cluster is upgraded
> > with
> > > > this KIP (upgrade the binary and then do a cluster rolling restart of
> > > > the brokers), the reassignment 

Jenkins build is back to normal : kafka-1.0-jdk7 #265

2019-03-25 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-8156) Client id when provided is not suffixed with an index

2019-03-25 Thread Nagaraj Gopal (JIRA)
Nagaraj Gopal created KAFKA-8156:


 Summary: Client id when provided is not suffixed with an index
 Key: KAFKA-8156
 URL: https://issues.apache.org/jira/browse/KAFKA-8156
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 2.1.1
Reporter: Nagaraj Gopal


We use Camel Kafka component and one of the configuration is consumersCount 
which is number of concurrent consumers that can read data from the topic. 
Usually we don't care about client id but when we start emitting metrics it 
becomes important piece of the puzzle. The client id would help differentiate 
metrics between different consumers each with `n` consumer count (concurrent 
consumers) and each consumer deployed in different JVMs.

Currently when client id is provided it is not suffixed with an index and when 
it is not provided the library seems to create its own client id prefixed with 
an index (format: consumer-0, consumer-1). This is limiting when we have 
multiple consumers as described above



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


an article on kafka vs pulsar

2019-03-25 Thread Sree V
Hi Team,
I like to share this article on kafka vs pulsar.
https://www.infoworld.com/article/3379120/pubsub-messaging-apache-kafka-vs-apache-pulsar.html
Your comments are valuable.

Thank you./Sree



Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2019-03-25 Thread George Li
 Thanks Ismael.  One question, even switch to submitting reassignments via RPC 
instead of Zookeeper.  The reassignment data will still persist in ZooKeeper 
node /admin/reassign_partitions (e.g. when Controller failover it can resume 
reassignments)?  If yes, how this can keep someone from modifying ZK 
(/admin/reassign_partitions) directly ? 


Thanks,
George

On Saturday, March 23, 2019, 1:07:11 PM PDT, Ismael Juma 
 wrote:  
 
 Thanks for the KIP, making reassignment more flexible is definitely
welcome. As others have mentioned, I think we need to do it via the Kafka
protocol and not via ZK. The latter introduces an implicit API that other
tools will depend on causing migration challenges. This has already
happened with the existing ZK based interface and we should avoid
introducing more tech debt here.

Ismael

On Sat, Mar 23, 2019, 12:09 PM Colin McCabe  wrote:

> On Thu, Mar 21, 2019, at 20:51, George Li wrote:
> >  Hi Colin,
> >
> > I agree with your proposal of having administrative APIs through RPC
> > instead of ZooKeeper. But seems like it will incur significant changes
> > to both submitting reassignments and this KIP's cancelling pending
> > reassignments.
> >
> > To make this KIP simple and moving along, I will be happy to do another
> > follow-up KIP to change all reassignment related operations via RP
>
> Thanks, George.  I think doing it as a two-step process is fine, but I
> suspect it would be much easier and quicker to do the RPC conversion first,
> and the interruptible part later.  The reason is because a lot of the
> things that people have brought up as concerns with this KIP are really
> issues with the API (how will people interact with ZK, how does access
> control work, what does the format look like in ZK) that will just go away
> once we have an RPC.
>
> > Just curious,  KIP-4 includes Topics/ACL related operations. In
> > addition to Reassignments,  any other operations should be done via
> > RPC?
>
> I think all of the administrative shell scripts have been converted except
> kafka-configs.sh.  I believe there is a KIP for that conversion.
> Reassigning partitions is probably the biggest KIP-4 gap we have right now.
>
> best,
> Colin
>
> >
> > Thanks,
> > George
> >
> >
> >    On Wednesday, March 20, 2019, 5:28:59 PM PDT, Colin McCabe
> >  wrote:
> >
> >  Hi George,
> >
> > One big problem here is that administrative APIs should be done through
> > RPCs, not through ZooKeeper.  KIP-4 (Command line and centralized
> > administrative operations) describes the rationale for this.  We want
> > public and stable APIs that don't depend on the internal representation
> > of stuff in ZK, which will change over time.  Tools shouldn't have to
> > integrate with ZK or understand the internal data structures of Kafka
> > to make administrative changes.  ZK doesn't have a good security,
> > access control, or compatibility story.
> >
> > We should create an official reassignment RPC for Kafka.  This will
> > solve many of the problems discussed in this thread, I think.  For
> > example, with an RPC, users won't be able to make changes unless they
> > have ALTER on KafkaCluster.  That avoids the problem of random users
> > making changes without the administrator knowing.  Also, if multiple
> > users are making changes, there is no risk that they will overwrite
> > each other's changes, since they won't be modifying the internal ZK
> > structures directly.
> >
> > I think a good reassignment API would be something like:
> >
> > > ReassignPartitionsResults reassignPartitions(Map PartitionAssignment> reassignments);
> > >
> > > class PartitionAssignment {
> > >  List nodes;
> > > }
> > >
> > > class ReassignPartitionsResults {
> > >  Map> pending;
> > >  Map> completed;
> > >  Map> rejected;
> > > }
> > >
> > > PendingReassignmentResults pendingReassignments();
> > >
> > > class PendingReassignmentResults {
> > >  KafkaFuture> pending;
> > >  KafkaFuture> previous;
> > > }
> >
> > best,
> > Colin
> >
> >
> > On Tue, Mar 19, 2019, at 15:04, George Li wrote:
> > >  Hi Viktor,
> > >
> > > Thanks for the review.
> > >
> > > If there is reassignment in-progress while the cluster is upgraded
> with
> > > this KIP (upgrade the binary and then do a cluster rolling restart of
> > > the brokers), the reassignment JSON in Zookeeper
> > > /admin/reassign_partitions will only have  {topic, partition,
> > > replicas(new)} info when the batch of reassignment was kicked off
> > > before the upgrade,  not with the "original_replicas" info per
> > > topic/partition.  So when the user is trying to cancel/rollback the
> > > reassignments, it's going to fail and the cancellation will be skipped
> > > (The code in this KIP will check the if the "original_replicas" is in
> > > the /admin/reassign_partition).
> > >
> > > The user either has to wait till current reassignments to finish or
> > > does quite some manual work to cancel them (delete ZK node, bounce
> > > controller, re-submit reassignments with 

Re: Request to add recent versions to the s3 bucket

2019-03-25 Thread John Roesler
Thanks Gwen,

I will try that! If we can remove a link from the chain, it would be a win.

Thanks,
-John

On Mon, Mar 25, 2019 at 4:12 PM Gwen Shapira  wrote:

> I'm a bit surprised that these are the instructions. The release process
> doesn't even mention S3, IIRC.
> Can you use an official Apache repo for testing? (
> https://www.apache.org/dyn/closer.cgi?path=/kafka/)
>
>
> On Mon, Mar 25, 2019 at 1:59 PM John Roesler  wrote:
>
> > Hi Kafka Devs,
> >
> > While updating the Streams system tests, I noticed that 2.1.1 and 2.2.0
> are
> > not uploaded to https://s3-us-west-2.amazonaws.com/kafka-packages yet.
> The
> > instructions in tests/kafkatest/tests/streams/streams_upgrade_test.py say
> > to ping this list!
> >
> > Can someone with access take care of this, please?
> >
> > Thanks,
> > -John
> >
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>


Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-03-25 Thread Adam Bellemare
Ach - Sorry. I meant Jason. I had just read a John Roesler email.

On Mon, Mar 25, 2019 at 5:21 PM Adam Bellemare 
wrote:

> Hi John
>
> What is the status of this KIP?
>
> My teammates and I are running into the "UNKNOWN_PRODUCER_ID" error on
> 2.1.1 for a multitude of our internal topics, and I suspect that a proper
> fix is needed.
>
> Adam
>
> On Mon, Jan 7, 2019 at 7:42 PM Guozhang Wang  wrote:
>
>> Thanks Jason. The proposed solution sounds good to me.
>>
>>
>> Guozhang
>>
>> On Mon, Jan 7, 2019 at 3:52 PM Jason Gustafson 
>> wrote:
>>
>> > Hey Guozhang,
>> >
>> > Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
>> > occurs following expiration of the producerId. It's possible that
>> another
>> > producerId has been installed in its place following expiration (if
>> another
>> > producer instance has become active), or the mapping is empty. We can
>> > safely retry the InitProducerId with the logic in this KIP in order to
>> > detect which case it is. So I'd suggest something like this:
>> >
>> > 1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
>> > InitProducerId using the current producerId and epoch.
>> > 2. If no mapping exists, the coordinator can generate a new producerId
>> and
>> > return it. If a transaction is in progress on the client, it will have
>> to
>> > be aborted, but the producer can continue afterwards.
>> > 3. Otherwise if a different producerId has been assigned, then we can
>> > return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
>> > probably raise this as ProducerFencedException since that is effectively
>> > what has happened. Ideally this is the only fatal case that users have
>> to
>> > handle.
>> >
>> > I'll give it a little more thought and update the KIP.
>> >
>> > Thanks,
>> > Jason
>> >
>> > On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang 
>> wrote:
>> >
>> > > You're right about the dangling txn since it will actually block
>> > > read-committed consumers from proceeding at all. I'd agree that since
>> > this
>> > > is a very rare case, we can consider fixing it not via broker-side
>> logic
>> > > but via tooling in a future work.
>> > >
>> > > I've also discovered some related error handling logic inside producer
>> > that
>> > > may be addressed together with this KIP (since it is mostly for
>> internal
>> > > implementations the wiki itself does not need to be modified):
>> > >
>> > >
>> > >
>> >
>> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
>> > >
>> > > Guozhang
>> > >
>> > >
>> > >
>> > > On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson 
>> > > wrote:
>> > >
>> > > > Hey Guozhang,
>> > > >
>> > > > To clarify, the broker does not actually use the ApiVersion API for
>> > > > inter-broker communications. The use of an API and its corresponding
>> > > > version is controlled by `inter.broker.protocol.version`.
>> > > >
>> > > > Nevertheless, it sounds like we're on the same page about removing
>> > > > DescribeTransactionState. The impact of a dangling transaction is a
>> > > little
>> > > > worse than what you describe though. Consumers with the
>> read_committed
>> > > > isolation level will be stuck. Still, I think we agree that this
>> case
>> > > > should be rare and we can reconsider for future work. Rather than
>> > > > preventing dangling transactions, perhaps we should consider options
>> > > which
>> > > > allows us to detect them and recover. Anyway, this needs more
>> thought.
>> > I
>> > > > will update the KIP.
>> > > >
>> > > > Best,
>> > > > Jason
>> > > >
>> > > > On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang 
>> > > wrote:
>> > > >
>> > > > > 0. My original question is about the implementation details
>> > primarily,
>> > > > > since current the handling logic of the APIVersionResponse is
>> simply
>> > > "use
>> > > > > the highest supported version of the corresponding request", but
>> if
>> > the
>> > > > > returned response from APIVersionRequest says "I don't even know
>> > about
>> > > > the
>> > > > > DescribeTransactionStateRequest at all", then we need additional
>> > logic
>> > > > for
>> > > > > the falling back logic. Currently this logic is embedded in
>> > > NetworkClient
>> > > > > which is shared by all clients, so I'd like to avoid making this
>> > logic
>> > > > more
>> > > > > complicated.
>> > > > >
>> > > > > As for the general issue that a broker does not recognize a
>> producer
>> > > with
>> > > > > sequence number 0, here's my thinking: as you mentioned in the
>> wiki,
>> > > this
>> > > > > is only a concern for transactional producer since for idempotent
>> > > > producer
>> > > > > it can just bump the epoch and go. For transactional producer,
>> even
>> > if
>> > > > the
>> > > > > producer request from a fenced producer gets accepted, its
>> > transaction
>> > > > will
>> > > > > never be committed and hence messages not exposed to
>> read-committed
>> > > > > consumers as 

Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-03-25 Thread Adam Bellemare
Hi John

What is the status of this KIP?

My teammates and I are running into the "UNKNOWN_PRODUCER_ID" error on
2.1.1 for a multitude of our internal topics, and I suspect that a proper
fix is needed.

Adam

On Mon, Jan 7, 2019 at 7:42 PM Guozhang Wang  wrote:

> Thanks Jason. The proposed solution sounds good to me.
>
>
> Guozhang
>
> On Mon, Jan 7, 2019 at 3:52 PM Jason Gustafson  wrote:
>
> > Hey Guozhang,
> >
> > Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
> > occurs following expiration of the producerId. It's possible that another
> > producerId has been installed in its place following expiration (if
> another
> > producer instance has become active), or the mapping is empty. We can
> > safely retry the InitProducerId with the logic in this KIP in order to
> > detect which case it is. So I'd suggest something like this:
> >
> > 1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
> > InitProducerId using the current producerId and epoch.
> > 2. If no mapping exists, the coordinator can generate a new producerId
> and
> > return it. If a transaction is in progress on the client, it will have to
> > be aborted, but the producer can continue afterwards.
> > 3. Otherwise if a different producerId has been assigned, then we can
> > return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
> > probably raise this as ProducerFencedException since that is effectively
> > what has happened. Ideally this is the only fatal case that users have to
> > handle.
> >
> > I'll give it a little more thought and update the KIP.
> >
> > Thanks,
> > Jason
> >
> > On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang  wrote:
> >
> > > You're right about the dangling txn since it will actually block
> > > read-committed consumers from proceeding at all. I'd agree that since
> > this
> > > is a very rare case, we can consider fixing it not via broker-side
> logic
> > > but via tooling in a future work.
> > >
> > > I've also discovered some related error handling logic inside producer
> > that
> > > may be addressed together with this KIP (since it is mostly for
> internal
> > > implementations the wiki itself does not need to be modified):
> > >
> > >
> > >
> >
> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > To clarify, the broker does not actually use the ApiVersion API for
> > > > inter-broker communications. The use of an API and its corresponding
> > > > version is controlled by `inter.broker.protocol.version`.
> > > >
> > > > Nevertheless, it sounds like we're on the same page about removing
> > > > DescribeTransactionState. The impact of a dangling transaction is a
> > > little
> > > > worse than what you describe though. Consumers with the
> read_committed
> > > > isolation level will be stuck. Still, I think we agree that this case
> > > > should be rare and we can reconsider for future work. Rather than
> > > > preventing dangling transactions, perhaps we should consider options
> > > which
> > > > allows us to detect them and recover. Anyway, this needs more
> thought.
> > I
> > > > will update the KIP.
> > > >
> > > > Best,
> > > > Jason
> > > >
> > > > On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > 0. My original question is about the implementation details
> > primarily,
> > > > > since current the handling logic of the APIVersionResponse is
> simply
> > > "use
> > > > > the highest supported version of the corresponding request", but if
> > the
> > > > > returned response from APIVersionRequest says "I don't even know
> > about
> > > > the
> > > > > DescribeTransactionStateRequest at all", then we need additional
> > logic
> > > > for
> > > > > the falling back logic. Currently this logic is embedded in
> > > NetworkClient
> > > > > which is shared by all clients, so I'd like to avoid making this
> > logic
> > > > more
> > > > > complicated.
> > > > >
> > > > > As for the general issue that a broker does not recognize a
> producer
> > > with
> > > > > sequence number 0, here's my thinking: as you mentioned in the
> wiki,
> > > this
> > > > > is only a concern for transactional producer since for idempotent
> > > > producer
> > > > > it can just bump the epoch and go. For transactional producer, even
> > if
> > > > the
> > > > > producer request from a fenced producer gets accepted, its
> > transaction
> > > > will
> > > > > never be committed and hence messages not exposed to read-committed
> > > > > consumers as well. The drawback is though, 1) read-uncommitted
> > > consumers
> > > > > will still read those messages, 2) unnecessary storage for those
> > fenced
> > > > > produce messages, but in practice should not accumulate to a large
> > > amount
> > > > > since producer should soon try to commit 

Re: Request to add recent versions to the s3 bucket

2019-03-25 Thread Gwen Shapira
I'm a bit surprised that these are the instructions. The release process
doesn't even mention S3, IIRC.
Can you use an official Apache repo for testing? (
https://www.apache.org/dyn/closer.cgi?path=/kafka/)


On Mon, Mar 25, 2019 at 1:59 PM John Roesler  wrote:

> Hi Kafka Devs,
>
> While updating the Streams system tests, I noticed that 2.1.1 and 2.2.0 are
> not uploaded to https://s3-us-west-2.amazonaws.com/kafka-packages yet. The
> instructions in tests/kafkatest/tests/streams/streams_upgrade_test.py say
> to ping this list!
>
> Can someone with access take care of this, please?
>
> Thanks,
> -John
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Request to add recent versions to the s3 bucket

2019-03-25 Thread John Roesler
Hi Kafka Devs,

While updating the Streams system tests, I noticed that 2.1.1 and 2.2.0 are
not uploaded to https://s3-us-west-2.amazonaws.com/kafka-packages yet. The
instructions in tests/kafkatest/tests/streams/streams_upgrade_test.py say
to ping this list!

Can someone with access take care of this, please?

Thanks,
-John


[jira] [Created] (KAFKA-8155) Update Streams system tests for 2.2.0 and 2.1.1 releases

2019-03-25 Thread John Roesler (JIRA)
John Roesler created KAFKA-8155:
---

 Summary: Update Streams system tests for 2.2.0 and 2.1.1 releases
 Key: KAFKA-8155
 URL: https://issues.apache.org/jira/browse/KAFKA-8155
 Project: Kafka
  Issue Type: Task
  Components: streams, system tests
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: MirrorMaker 2.0 and Streams interplay (topic naming control)

2019-03-25 Thread Ryanne Dolan
Hey Paul, thanks for the kind words re MM2.

I'm not a Streams expert first off, but I think I understand your question:
if a Streams app can switch between topics with and without a cluster alias
prefix when you migrate between prod and pre-prod, while preserving state.
Streams supports regexes and lists of topics as input, so you can use e.g.
builder.stream(List.of("topic1", "prod.topic1")), which is a good place to
start. In this case, the combined subscription is still a single stream,
conceptually, but comprises partitions from both topics, i.e. partitions
from topic1 plus partitions from prod.topic1. At a high level, this is no
different than adding more partitions to a single topic. I think any
intermediate or downstream topics/tables would remain unchanged, since they
are still the result of this single stream.

The trick is to correctly translate offsets for the input topics when
migrating the app between prod and pre-prod, which RemoteClusterUtils can
help with. You could do this with external tooling, e.g. a script
leveraging RemoteClusterUtils and kafka-streams-application-reset.sh. I
haven't tried this with a Streams app myself, but I suspect it would work.

Ryanne


On Sun, Mar 24, 2019 at 12:31 PM Paul Whalen  wrote:

> Hi all,
>
> With MirrorMaker 2.0 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
> )
> accepted and coming along very nicely in development, it has got me
> wondering if a certain use case is supported, and if not, can changes be
> made to Streams or MM2 to support it.  I'll explain the use case, but the
> TL;DR here is "do we need more control over topic naming in MM2 or
> Streams?"
>
> My team foresees using MM2 as a way to mirror data from our prod
> environment to a pre-prod environment.  The data is supplied by external
> vendors, introduced into our system through a Kafka Streams ETL pipeline,
> and consumed by our end-applications.  Generally we would only like to run
> the ETL pipeline in prod since there is an operational cost to running it
> in both prod and pre-prod (the data sometimes needs manual attention).
> This seems to fit MM2 well: pre-prod end-applications consume from the
> pre-prod Kafka cluster, which is entirely "remote" topics being mirrored
> from the prod cluster.  We only have to keep one instance of the ETL
> pipeline running, but end-applications can be separate, connecting to their
> respective prod and pre-prod Kafka clusters.
>
> However, when we want to test changes to the ETL pipeline itself, we would
> like to turn off the mirroring from prod to pre-prod, and run the ETL
> pipeline also in pre-prod, picking up the most recent state of the prod
> pipeline from when mirroring was turned off (FWIW, downtime is not an issue
> for our use case).
>
> My question/concern is basically, can Streams apps work when they're
> running against topics prepended with a cluster alias, like
> "pre-prod.App-statestore-changelog" as is the plan with MM2. From what I
> can tell the answer is no, and my proposal would be to give the Streams
> user more specific control over how Streams names its internal topics
> (repartition and changelogs) by defining an "InternalTopicNamingStrategy"
> or similar.  Perhaps there is a solution on the MM2 side as well, but it
> seems much less desirable to budge on that convention.
>
> I phrased the question in terms of my team's problem, but it's worth noting
> that this use case is passably similar to a potential DR use case, where
> there is a DR cluster that is normally just being mirrored to by MM2, but
> in a DR scenario would become the active cluster that Streams applications
> are connected to.
>
> Thanks for considering this issue, and great job to those working on MM2 so
> far!
>
> Paul
>


Re: MirrorMaker 2.0 and Streams interplay (topic naming control)

2019-03-25 Thread John Roesler
Hi Paul,

Thanks for the email. This does seem like a good setup to support.

This might seem a little low-fi, but do you think it would work to handle
this
use case entirely in "user space"? I may be missing something because
this is off the cuff... In the code for your Streams app, I'm wondering
if you can prepend your input/output topics with a config-driven string
like:

builder.stream(config.getEnvPrefix() + "my-input-topic")

Regarding internal topics, I think the issue might be more complicated than
just naming. I'm assuming you wish to load the changelog into the pre-prod
app so that it can just "restore" the prod app's state and continue
processing
from there. But, the prod app will still be running, and its changelog will
still be
mirrored into pre-prod when you start the pre-prod app. Then, you'd
basically
have both prod and pre-prod writing into the pre-prod changelog at the same
time. This seems likely to produce undesirable behavior. Also, the pre-prod
app won't be in the same consumer group as the prod app, so it won't know
from what offset to start processing input. It will load newer changelog
state
from prod and then start processing older events, probably producing
different
results from production anyway.

If you can constrain the testing effort to be limited to only the mirrored
"external" topics, I think you'll get more predictable results. But as I
noted, this
is off the cuff. Please let me know if I've overlooked something.

Thanks,
-John


On Sun, Mar 24, 2019 at 12:31 PM Paul Whalen  wrote:

> Hi all,
>
> With MirrorMaker 2.0 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
> )
> accepted and coming along very nicely in development, it has got me
> wondering if a certain use case is supported, and if not, can changes be
> made to Streams or MM2 to support it.  I'll explain the use case, but the
> TL;DR here is "do we need more control over topic naming in MM2 or
> Streams?"
>
> My team foresees using MM2 as a way to mirror data from our prod
> environment to a pre-prod environment.  The data is supplied by external
> vendors, introduced into our system through a Kafka Streams ETL pipeline,
> and consumed by our end-applications.  Generally we would only like to run
> the ETL pipeline in prod since there is an operational cost to running it
> in both prod and pre-prod (the data sometimes needs manual attention).
> This seems to fit MM2 well: pre-prod end-applications consume from the
> pre-prod Kafka cluster, which is entirely "remote" topics being mirrored
> from the prod cluster.  We only have to keep one instance of the ETL
> pipeline running, but end-applications can be separate, connecting to their
> respective prod and pre-prod Kafka clusters.
>
> However, when we want to test changes to the ETL pipeline itself, we would
> like to turn off the mirroring from prod to pre-prod, and run the ETL
> pipeline also in pre-prod, picking up the most recent state of the prod
> pipeline from when mirroring was turned off (FWIW, downtime is not an issue
> for our use case).
>
> My question/concern is basically, can Streams apps work when they're
> running against topics prepended with a cluster alias, like
> "pre-prod.App-statestore-changelog" as is the plan with MM2. From what I
> can tell the answer is no, and my proposal would be to give the Streams
> user more specific control over how Streams names its internal topics
> (repartition and changelogs) by defining an "InternalTopicNamingStrategy"
> or similar.  Perhaps there is a solution on the MM2 side as well, but it
> seems much less desirable to budge on that convention.
>
> I phrased the question in terms of my team's problem, but it's worth noting
> that this use case is passably similar to a potential DR use case, where
> there is a DR cluster that is normally just being mirrored to by MM2, but
> in a DR scenario would become the active cluster that Streams applications
> are connected to.
>
> Thanks for considering this issue, and great job to those working on MM2 so
> far!
>
> Paul
>


Jenkins build is back to normal : kafka-1.1-jdk7 #254

2019-03-25 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2019-03-25 Thread Rajesh Nataraja (JIRA)
Rajesh Nataraja created KAFKA-8154:
--

 Summary: Buffer Overflow exceptions between brokers and with 
clients
 Key: KAFKA-8154
 URL: https://issues.apache.org/jira/browse/KAFKA-8154
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.1.0
Reporter: Rajesh Nataraja
 Attachments: server.properties.txt

https://github.com/apache/kafka/pull/6495

https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-392: Allow consumers to fetch from the closest replica

2019-03-25 Thread David Arthur
+1

Thanks, Jason!

On Mon, Mar 25, 2019 at 1:23 PM Eno Thereska  wrote:

> +1 (non-binding)
> Thanks for updating the KIP and addressing my previous comments.
>
> Eno
>
> On Mon, Mar 25, 2019 at 4:35 PM Ryanne Dolan 
> wrote:
>
> > +1 (non-binding)
> >
> > Great stuff, thanks.
> >
> > Ryanne
> >
> > On Mon, Mar 25, 2019, 11:08 AM Jason Gustafson 
> wrote:
> >
> > > Hi All, discussion on the KIP seems to have died down, so I'd like to
> go
> > > ahead and start a vote. Here is a link to the KIP:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > > .
> > >
> > > +1 from me (duh)
> > >
> > > -Jason
> > >
> >
>


-- 
David Arthur


Re: [VOTE] KIP-392: Allow consumers to fetch from the closest replica

2019-03-25 Thread Eno Thereska
+1 (non-binding)
Thanks for updating the KIP and addressing my previous comments.

Eno

On Mon, Mar 25, 2019 at 4:35 PM Ryanne Dolan  wrote:

> +1 (non-binding)
>
> Great stuff, thanks.
>
> Ryanne
>
> On Mon, Mar 25, 2019, 11:08 AM Jason Gustafson  wrote:
>
> > Hi All, discussion on the KIP seems to have died down, so I'd like to go
> > ahead and start a vote. Here is a link to the KIP:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > .
> >
> > +1 from me (duh)
> >
> > -Jason
> >
>


Re: [VOTE] KIP-392: Allow consumers to fetch from the closest replica

2019-03-25 Thread Guozhang Wang
+1 (binding). A very well written proposal and a pleasant read. Thanks
Jason!


Guozhang

On Mon, Mar 25, 2019 at 9:07 AM Jason Gustafson  wrote:

> Hi All, discussion on the KIP seems to have died down, so I'd like to go
> ahead and start a vote. Here is a link to the KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> .
>
> +1 from me (duh)
>
> -Jason
>


-- 
-- Guozhang


[jira] [Resolved] (KAFKA-8150) Fix bugs in handling null arrays in generated RPC code

2019-03-25 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe resolved KAFKA-8150.

   Resolution: Fixed
Fix Version/s: 2.2.1

The code path that this fixes isn't used in 2.2, I think.  But I backported the 
patch to that branch just for the purpose of future-proofing.

> Fix bugs in handling null arrays in generated RPC code
> --
>
> Key: KAFKA-8150
> URL: https://issues.apache.org/jira/browse/KAFKA-8150
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 2.2.1
>
>
> Fix bugs in handling null arrays in generated RPC code.
> toString should not get a NullPointException.
> Also, read() must properly translate a negative array length to a null field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-392: Allow consumers to fetch from the closest replica

2019-03-25 Thread Ryanne Dolan
+1 (non-binding)

Great stuff, thanks.

Ryanne

On Mon, Mar 25, 2019, 11:08 AM Jason Gustafson  wrote:

> Hi All, discussion on the KIP seems to have died down, so I'd like to go
> ahead and start a vote. Here is a link to the KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> .
>
> +1 from me (duh)
>
> -Jason
>


[VOTE] KIP-392: Allow consumers to fetch from the closest replica

2019-03-25 Thread Jason Gustafson
Hi All, discussion on the KIP seems to have died down, so I'd like to go
ahead and start a vote. Here is a link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
.

+1 from me (duh)

-Jason


Create SSL Kafka AdminClient object using keystore.jks from database

2019-03-25 Thread 1228neha



I need to create kafkaAmdinClient SSL object using java.security.keystore 
object. i.e I have to read the keystore and truststore files from database as a 
clob and get the keystore in java.security.keystore object and use this to 
create Admin client object.

I am able to create AdminClient object using properties object :

props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePwd);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
"/ngs/app/bolt_components/kafka_ssl/RN_BC_YELLOWBIRD/client.truststore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePwd);
kafkaAdminClient = AdminClient.create(props);

But as per requiremtn i should not specify keystore file location. Instead 
specify keystore object.

KeyStoreDto kDto=KeystoreManager.getKafkaKeyStoreDto(kafkaDto.getKEYSTORE_ID());
java.security.KeyStore keyStore = kDto.getKeyStore();
java.security.KeyStore trustStore = kDto.getTrustStore();
String keyStorePwd=kDto.getKeyStorePassword();
String trustStorePwd=kDto.getTrustStorePassword();

>From this i have to use keyStore and trustStore.

Can anyone help me with this.

Is there any other way in which keystore.jks file can be used to create 
AdminClient object in Kafka through java code if keystore.jks is stored in DB? 
Any workaround?



[jira] [Resolved] (KAFKA-8014) Extend Connect integration tests to add and remove workers dynamically

2019-03-25 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8014.
--
Resolution: Fixed

> Extend Connect integration tests to add and remove workers dynamically
> --
>
> Key: KAFKA-8014
> URL: https://issues.apache.org/jira/browse/KAFKA-8014
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
>  To allow for even more integration tests that can focus on testing Connect 
> framework itself, it seems necessary to add the ability to add and remove 
> workers from within a test case. 
> The suggestion is to extend Connect's integration test harness 
> {{EmbeddedConnectCluster}} to include methods to add and remove workers as 
> well as return the workers that are online at any given point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-25 Thread Ivan Ponomarev

Paul,

I see your point when you are talking about 
stream..branch..branch...default..


Still, I believe that this cannot not be implemented the easy way. Maybe 
we all should think further.


Let me comment on two of your ideas.


user could specify a terminal method that assumes nothing will reach the 
default branch,

throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option besides `default`, 
because there are scenarios when we want to just silently drop the 
messages that didn't match any predicate. 2) Throwing an exception in 
the middle of data flow processing looks like a bad idea. In stream 
processing paradigm, I would prefer to emit a special message to a 
dedicated stream. This is exactly where `default` can be used.



it would be fairly easily for the InternalTopologyBuilder to track dangling

branches that haven't been terminated and raise a clear error before it
becomes an issue.

You mean a runtime exception, when the program is compiled and run? 
Well,  I'd prefer an API that simply won't compile if used incorrectly. 
Can we build such an API as a method chain starting from KStream object? 
There is a huge cost difference between runtime and compile-time errors. 
Even if a failure uncovers instantly on unit tests, it costs more for 
the project than a compilation failure.


Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:

Ivan,

Good point about the terminal operation being required.  But is that really
such a bad thing?  If the user doesn't want a defaultBranch they can call
some other terminal method (noDefaultBranch()?) just as easily.  In fact I
think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well documented, but
it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's critical to
allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just need the
stream as a variable so you can call as many operations on it as you desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev  wrote:


Hello Paul,

I afraid this won't work because we do not always need the
defaultBranch. And without a terminal operation we don't know when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do something
more with the original branch after branching.

I understand your point that the need of special object construction
contrasts the fluency of most KStream methods. But here we have a
special case: we build the switch to split the flow, so I think this is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:

Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the

stream

so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right

way

to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
  .addBranch(predicate1, this::handle1)
  .addBranch(predicate2, this::handle2)
  .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or

something,

which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so

the

new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched()

or

branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev



wrote:


Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
  ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
  ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
 .addBranch(predicate1, 

Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-03-25 Thread Randall Hauch
Paul,

Thanks for updating the KIP with the proposal. I do think the KIP should at
least mention that the prior behavior is to allow the worker to override
the `producer.client.id` or `consumer.client.id`, which is entirely
possible (though unlikely since there would be an MBean conflict, as
pointed out in the discussion). It might be sufficient to just add a
sentence to the "Compatibility, Deprecation, and Migration Plan" section,
like "Any client IDs specified in the worker configuration via `
producer.client.id` or `consumer.client.id` properties will be unchanged,
as those will take precedence." Thoughts?

Ryanne,

IIUC your last message, I think the latest KIP proposal will align pretty
closely with your suggestion. Can you review and confirm?

Best regards,

Randall

On Fri, Mar 1, 2019 at 3:04 PM Ryanne Dolan  wrote:

> Paul, Randall, I don't think most people will care to exercise so much
> control over the client IDs, so long as they are filled in automatically in
> a way that eliminates duplicate metrics and remains somewhat legible. If we
> let the user specify a pattern or something, we're really just making the
> user worry about these requirements.
>
> For example, if they specify "foo" as the client.id, they'll get a bunch
> of
> exceptions about that MBean already existing. So they'll try
> "${connectorName}-foo", which won't work because connectors that get
> restarted will re-use the same client ID and the same MBean again. And so
> on, until they end up solving the same problem we are trying to solve here.
>
> I think you at least need something like "connect--producer-dlq" to
> avoid MBeans being re-registered within the same JVM. I believe the task ID
> is based on the connector name, so you'd get e.g.
> "connect-myconnector-1-producer".
>
> Ryanne
>
>
> On Fri, Mar 1, 2019 at 12:44 PM Paul Davidson
>  wrote:
>
> > Thanks Randall.  I like your suggestion: as you say, this would make it
> > possible to usefully override the default client id properties.
> >
> > I'm not sure how we would handle the dead-letter queue case though -
> maybe
> > we could automatically add a "dlq-" prefix to the producer client id?
> >
> > If there is agreement on this change I will update the KIP and the PR
> (when
> > I find some time).
> >
> >
> > On Thu, Feb 21, 2019 at 8:12 AM Randall Hauch  wrote:
> >
> > > Hi, Paul. Thanks for the update to KIP-411 to reflect adding defaults,
> > and
> > > creating/updating https://github.com/apache/kafka/pull/6097 to reflect
> > > this
> > > approach.
> > >
> > > Now that we've avoided adding a new config and have changed the
> default `
> > > client.id` to include some context, the connector name, and task
> > number, I
> > > think it makes overriding the client ID via worker config `
> > > producer.client.id` or `consumer.client.id` properties less valuable
> > > because those overridden client IDs will be exactly the same for all
> > > connectors and tasks.
> > >
> > > One one hand, we can leave this as-is, and any users that include `
> > > producer.client.id` and `consumer.client.id` in their worker configs
> > keep
> > > the same (sort of useless) behavior. In fact, most users would probably
> > be
> > > better off by removing these worker config properties and instead
> relying
> > > upon the defaults.
> > >
> > > On the other, similar to what Ewen suggested earlier (in a different
> > > context), we could add support for users to optionally use
> > > "${connectorName}" and ${task}" in their overridden client ID property
> > and
> > > have Connect replace these (if found) with the connector name and task
> > > number. Any existing properties that don't use these variables would
> > behave
> > > as-is, but this way the users could define their own client IDs yet
> still
> > > get the benefit of uniquely identifying each of the clients. For
> example,
> > > if my worker config contained the following:
> > >
> > > producer.client.id
> > =connect-cluster-A-${connectorName}-${task}-producer
> > > consumer.client.id
> > =connect-cluster-A-${connectorName}-${task}-consumer
> > >
> > > Thoughts?
> > >
> > > Randall
> > >
> > > On Wed, Feb 20, 2019 at 3:18 PM Ryanne Dolan 
> > > wrote:
> > >
> > > > Thanks Paul, this is great. This will make monitoring Connect a ton
> > > easier.
> > > >
> > > > Ryanne
> > > >
> > > > On Wed, Feb 20, 2019 at 1:24 PM Paul Davidson
> > > >  wrote:
> > > >
> > > > > I have updated KIP-411 to propose changing the default client id -
> > see:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> > > > >
> > > > >
> > > > > There is also an PR ready to go here:
> > > > > https://github.com/apache/kafka/pull/6097
> > > > >
> > > > > On Fri, Jan 11, 2019 at 3:39 PM Paul Davidson <
> > > pdavid...@salesforce.com>
> > > > > wrote:
> > > > >
> > > > > > Hi everyone.  We seem to have agreement that the ideal approach
> is
> > to
> > 

Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-03-25 Thread Ryanne Dolan
Randall, Paul, the proposal looks great, thanks.

Ryanne

On Mon, Mar 25, 2019, 9:03 AM Randall Hauch  wrote:

> Paul,
>
> Thanks for updating the KIP with the proposal. I do think the KIP should at
> least mention that the prior behavior is to allow the worker to override
> the `producer.client.id` or `consumer.client.id`, which is entirely
> possible (though unlikely since there would be an MBean conflict, as
> pointed out in the discussion). It might be sufficient to just add a
> sentence to the "Compatibility, Deprecation, and Migration Plan" section,
> like "Any client IDs specified in the worker configuration via `
> producer.client.id` or `consumer.client.id` properties will be unchanged,
> as those will take precedence." Thoughts?
>
> Ryanne,
>
> IIUC your last message, I think the latest KIP proposal will align pretty
> closely with your suggestion. Can you review and confirm?
>
> Best regards,
>
> Randall
>
> On Fri, Mar 1, 2019 at 3:04 PM Ryanne Dolan  wrote:
>
> > Paul, Randall, I don't think most people will care to exercise so much
> > control over the client IDs, so long as they are filled in automatically
> in
> > a way that eliminates duplicate metrics and remains somewhat legible. If
> we
> > let the user specify a pattern or something, we're really just making the
> > user worry about these requirements.
> >
> > For example, if they specify "foo" as the client.id, they'll get a bunch
> > of
> > exceptions about that MBean already existing. So they'll try
> > "${connectorName}-foo", which won't work because connectors that get
> > restarted will re-use the same client ID and the same MBean again. And so
> > on, until they end up solving the same problem we are trying to solve
> here.
> >
> > I think you at least need something like "connect--producer-dlq"
> to
> > avoid MBeans being re-registered within the same JVM. I believe the task
> ID
> > is based on the connector name, so you'd get e.g.
> > "connect-myconnector-1-producer".
> >
> > Ryanne
> >
> >
> > On Fri, Mar 1, 2019 at 12:44 PM Paul Davidson
> >  wrote:
> >
> > > Thanks Randall.  I like your suggestion: as you say, this would make it
> > > possible to usefully override the default client id properties.
> > >
> > > I'm not sure how we would handle the dead-letter queue case though -
> > maybe
> > > we could automatically add a "dlq-" prefix to the producer client id?
> > >
> > > If there is agreement on this change I will update the KIP and the PR
> > (when
> > > I find some time).
> > >
> > >
> > > On Thu, Feb 21, 2019 at 8:12 AM Randall Hauch 
> wrote:
> > >
> > > > Hi, Paul. Thanks for the update to KIP-411 to reflect adding
> defaults,
> > > and
> > > > creating/updating https://github.com/apache/kafka/pull/6097 to
> reflect
> > > > this
> > > > approach.
> > > >
> > > > Now that we've avoided adding a new config and have changed the
> > default `
> > > > client.id` to include some context, the connector name, and task
> > > number, I
> > > > think it makes overriding the client ID via worker config `
> > > > producer.client.id` or `consumer.client.id` properties less valuable
> > > > because those overridden client IDs will be exactly the same for all
> > > > connectors and tasks.
> > > >
> > > > One one hand, we can leave this as-is, and any users that include `
> > > > producer.client.id` and `consumer.client.id` in their worker configs
> > > keep
> > > > the same (sort of useless) behavior. In fact, most users would
> probably
> > > be
> > > > better off by removing these worker config properties and instead
> > relying
> > > > upon the defaults.
> > > >
> > > > On the other, similar to what Ewen suggested earlier (in a different
> > > > context), we could add support for users to optionally use
> > > > "${connectorName}" and ${task}" in their overridden client ID
> property
> > > and
> > > > have Connect replace these (if found) with the connector name and
> task
> > > > number. Any existing properties that don't use these variables would
> > > behave
> > > > as-is, but this way the users could define their own client IDs yet
> > still
> > > > get the benefit of uniquely identifying each of the clients. For
> > example,
> > > > if my worker config contained the following:
> > > >
> > > > producer.client.id
> > > =connect-cluster-A-${connectorName}-${task}-producer
> > > > consumer.client.id
> > > =connect-cluster-A-${connectorName}-${task}-consumer
> > > >
> > > > Thoughts?
> > > >
> > > > Randall
> > > >
> > > > On Wed, Feb 20, 2019 at 3:18 PM Ryanne Dolan 
> > > > wrote:
> > > >
> > > > > Thanks Paul, this is great. This will make monitoring Connect a ton
> > > > easier.
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Wed, Feb 20, 2019 at 1:24 PM Paul Davidson
> > > > >  wrote:
> > > > >
> > > > > > I have updated KIP-411 to propose changing the default client id
> -
> > > see:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 

Re: [DISCUSS] KIP-409: Allow creating under-replicated topics and partitions

2019-03-25 Thread Mickael Maison
Thanks Colin for the feedback.

The idea was to allow both users and administrator to decide if they
wanted to opt-in and if so under what conditions.

Maybe we could do something simpler and just allow the creation if at
least min-in-sync replicas are available? That should not require
changes to the protocol and while this might not cover all possible
use cases, that would still cover the use cases we've listed in the
KIP. That would also tie in with existing semantics/guarantees
(min-in-sync).

Thanks

On Tue, Feb 26, 2019 at 5:40 PM Colin McCabe  wrote:
>
> Hi Mickael,
>
> I don't think adding CREATED_UNDER_REPLICATED as an error code makes sense.  
> It is not an error condition, as described here.
>
> > Updates to the Decommissioning brokers section in the documentation
> > will mention that if a broker id is never to be reused then its 
> > corresponding node in zookeeper
> > /brokers/observed_ids will need to be removed manually
>
> I don't think it's acceptable to ask admins to manually modify ZooKeeper 
> here.  In general the ZK changes seem kind of like a hack -- perhaps we 
> should drop it from the proposal for now.
>
> Perhaps we could even somehow do all of this in a custom CreateTopicPolicy?  
> That would avoid the need for RPC changes, new configuration knobs, etc.
>
> best,
> Colin
>
>
> On Tue, Dec 18, 2018, at 08:43, Mickael Maison wrote:
> > Hi,
> >
> > We have submitted a KIP to handle topics and partitions creation when
> > a cluster is not fully available:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-409%3A+Allow+creating+under-replicated+topics+and+partitions
> >
> > As always, we welcome feedback and suggestions.
> >
> > Thanks
> > Mickael and Edoardo
> >


Re: [VOTE] KIP-396: Add Commit/List Offsets Operations to AdminClient

2019-03-25 Thread Mickael Maison
Bumping this thread once again

Ismael, have I answered your questions?
While this has received a few non-binding +1s, no committers have
voted yet. If you have concerns or questions, please let me know.

Thanks

On Mon, Feb 11, 2019 at 11:51 AM Mickael Maison
 wrote:
>
> Bumping this thread as it's been a couple of weeks.
>
> On Tue, Jan 22, 2019 at 2:26 PM Mickael Maison  
> wrote:
> >
> > Thanks Ismael for the feedback. I think your point has 2 parts:
> > - Having the reset functionality in the AdminClient:
> > The fact we have a command line tool illustrate that this operation is
> > relatively common. I seems valuable to be able to perform this
> > operation directly via a proper API in addition of the CLI tool.
> >
> > - Sending an OffsetCommit directly instead of relying on KafkaConsumer:
> > The KafkaConsumer requires a lot of stuff to commit offsets. Its group
> > cannot change so you need to start a new Consumer every time, that
> > creates new connections and overal sends more requests. Also there are
> > already  a bunch of AdminClient APIs that have logic very close to
> > what needs to be done to send a commit request, keeping the code small
> > and consistent.
> >
> > I've updated the KIP with these details and moved the 2nd part to
> > "Proposed changes" as it's more an implementation detail.
> >
> > I hope this answers your question
> >
> > On Mon, Jan 21, 2019 at 7:41 PM Ismael Juma  wrote:
> > >
> > > The KIP doesn't discuss the option of using KafkaConsumer directly as far
> > > as I can tell. We have tried to avoid having the same functionality in
> > > multiple clients so it would be good to explain why this is necessary here
> > > (not saying it isn't).
> > >
> > > Ismael
> > >
> > > On Mon, Jan 21, 2019, 10:29 AM Mickael Maison  > > wrote:
> > >
> > > > Thanks Ryanne for the feedback, all suggestions sounded good, I've
> > > > updated the KIP accordingly.
> > > >
> > > > On Mon, Jan 21, 2019 at 3:43 PM Ryanne Dolan 
> > > > wrote:
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > But I suggest:
> > > > >
> > > > > - drop "get" from getOffset, getTimestamp.
> > > > >
> > > > > - add to the motivation section why this is better than constructing a
> > > > > KafkaConsumer and using seek(), commit() etc.
> > > > >
> > > > > - add some rejected alternatives.
> > > > >
> > > > > Ryanne
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jan 21, 2019, 7:57 AM Dongjin Lee  > > > >
> > > > > > We have +4 non-binding for this vote. Is there any committer who is
> > > > > > interested in this issue?
> > > > > >
> > > > > > Thanks,
> > > > > > Dongjin
> > > > > >
> > > > > > On Mon, Jan 21, 2019 at 10:33 PM Andrew Schofield <
> > > > > > andrew_schofi...@live.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 (non-binding). Thanks for the KIP.
> > > > > > >
> > > > > > > On 21/01/2019, 12:45, "Eno Thereska" 
> > > > wrote:
> > > > > > >
> > > > > > > +1 (non binding). Thanks.
> > > > > > >
> > > > > > > On Mon, Jan 21, 2019 at 12:30 PM Mickael Maison <
> > > > > > > mickael.mai...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Bumping this thread. Considering this KIP is relatively 
> > > > > > > straigh
> > > > > > > > forward, can we get some votes or feedback if you think it's
> > > > not?
> > > > > > > > Thanks
> > > > > > > >
> > > > > > > > On Tue, Jan 8, 2019 at 5:40 PM Edoardo Comar <
> > > > edoco...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > +1 (non-binding)
> > > > > > > > > Thanks Mickael!
> > > > > > > > >
> > > > > > > > > On Tue, 8 Jan 2019 at 17:39, Patrik Kleindl <
> > > > pklei...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1 (non-binding)
> > > > > > > > > > Thanks, sounds very helpful
> > > > > > > > > > Best regards
> > > > > > > > > > Patrik
> > > > > > > > > >
> > > > > > > > > > > Am 08.01.2019 um 18:10 schrieb Mickael Maison <
> > > > > > > > mickael.mai...@gmail.com
> > > > > > > > > > >:
> > > > > > > > > > >
> > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to start the vote on KIP-396:
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > https://eur04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D97551484data=02%7C01%7C%7C47c103e1919142c35d7c08d67f9e4c5d%7C84df9e7fe9f640afb435%7C1%7C0%7C636836715187389495sdata=ihLaSXvB8C%2BK%2F%2BWjVDqKXgUJoRDmwfIi7FvFLRzmFe4%3Dreserved=0
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > "When the people fear their government, there is tyranny;
> > > > when
> > > > > > the
> > > > > > > > > government fears the people, there is liberty." [Thomas
> > > > > > 

Re: [VOTE] KIP-349 Priorities for Source Topics

2019-03-25 Thread Sönke Liebau
Hi Colin,

that is definitely a good option and will cover 90% of all use cases
(probaby more).

However strictly speaking it only addresses one half of the issue unless I
am mistaken. The internal behavior of the KafkaConsumer (which partition
the fetcher gets data from next and which buffered data is returned on the
next poll) is not affected by this. So records will only "jump the queue"
once they leave the KafkaConsumer, until then they will need to fairly
queue just like the rest of the messages.
Again, this will be sufficient in most cases, but if you want high priority
messages to actually jump to the front of the queue you would probably want
to combine both approaches and have a consumer for high prio topics and one
for the rest, both feeding into the same prioritized queue.

Best regards,
Sönke

On Mon, Mar 25, 2019 at 5:43 AM Colin McCabe  wrote:

> On Sat, Mar 23, 2019, at 18:41, nathank...@gmail.com wrote:
> >
> >
> > On 2019/01/28 02:26:31, n...@afshartous.com wrote:
> > > Hi Sönke,
> > >
> > > Thanks for taking the time to review.  I’ve put KIP-349 into
> hibernation.
> > >
> > > Thanks also to everyone who participated in the discussion.
> > >
> > > Best regards,
> > > --
> > >   Nick
> > >
> > > > On Jan 25, 2019, at 5:51 AM, Sönke Liebau <
> soenke.lie...@opencore.com.INVALID> wrote:
> > > >
> > > > a bit late to the party, sorry. I recently spent some time looking
> > > > into this / a similar issue [1].
> > > > After some investigation and playing around with settings I think
> that
> > > > the benefit that could be gained from this is somewhat limited and
> > > > probably outweighed by the implementation effort.
> > > >
> > > > The consumer internal are already geared towards treating partitions
> > > > fairly so that no partition has to wait an undue amount of time and
> > > > this can be further tuned for latency over throughput. Additionally,
> > > > if this is a large issue for someone, there is always the option of
> > > > having a dedicated consumer reading only from the control topic,
> which
> > > > would mean that messages from that topic are received "immediately".
> > > > For a Kafka Streams job it would probably make sense to create two
> > > > input streams and then merging those as a first step.
> > > >
> > > > I think with these knobs a fairly large amount of flexibility can be
> > > > achieved so that there is no urgent need to implement priorities.
> > > >
> > > > So my personal preference would be to set this KIP to dormant for
> now.
> > >
> > >
> > >
> > >
> > >
> > >
> > Hello Nick,
> >
> > I'm extremely new to Kafka, but I was attempting to set up a per-topic
> > priority application, and ended up finding this thread. I'm having
> > difficulty seeing how one can implement it with pause/resume. Would you
> > elaborate?
> >
> > Since those operations are per-partition, and when you stop a
> > partition, it attempts to re-balance, I would need to stop all
> > partitions. Even then, it would try to finish the current transactions
> > instead of immediately putting it on hold and processing other topics.
>
> Hi nathankski,
>
> Calling pause() on a partition doesn't trigger a re-balance or try to
> finish the current transactions.  It just means that you won't get more
> records for that partition until you call resume() on it.
>
> >
> > It also looks like in order to determine if I had received messages
> > from the pri-1 topic, I would need to loop through all records, and
> > ignore those that weren't pri-1 until a poll failed to retrieve any,
> > which seems like it would screw up the other topics.
>
> One way to do this would be to have two threads.  The first thread calls
> poll() on the Kafka consumer.  It puts the records it retrieves into a
> PriorityBlockingQueue.  Records from pri-1 have the priority within the
> queue.
>
> The second thread retrieves records from the queue.  pri-1 records will
> always be pulled out of the PriorityBlockingQueue ahead of any other
> records, so they will be processed first.
>
> If the priority queue gets too big, you pause partitions until thread 2
> can clear the backlog.  The low-priority partition is paused first.
>
> best,
> Colin
>
> >
> > Thank you,
> >
> > Nathan
> >
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


Source connectors stops working after the broker crashes.

2019-03-25 Thread Федор Чернилин
Configuration:

3 br

1 zk

1 connect worker(distributed mode)

8 connectors(sink and source)


Description of connect topics:
[image: Снимок экрана 2019-03-22 в 17.00.56.png]


Environment:

K8s, gcloud, confluent images


After crash of first broker there are several messages that broker is not
available. During it’s restart, there is brokers rebalancing which changes
the leaders and replicators of topic partitions and after that all is fine.
Following messages appear when broker failed:

INFO [Worker clientId=connect-1, groupId=connect-cluster] Group coordinator
kafka-broker-2-int:29092 (id: 2147483645 rack: null) is unavailable or
invalid, will attempt rediscovery
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

INFO [Worker clientId=connect-1, groupId=connect-cluster] Group coordinator
kafka-broker-2-int:29092 (id: 2147483645 rack: null) is unavailable or
invalid, will attempt rediscovery
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

INFO [Worker clientId=connect-1, groupId=connect-cluster] Attempt to
heartbeat failed since coordinator kafka-broker-2-int:29092 (id: 2147483645
rack: null) is either not started or not valid.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)


Connectors use only two other brokers and no warning messages. But when
another broker is crashed warning messages are not stopped and connect
offset consumer fails with timeout error.

INFO [Consumer clientId=consumer-2, groupId=connect-cluster] Error sending
fetch request (sessionId=1683702723, epoch=INITIAL) to node 1:
org.apache.kafka.common.errors.TimeoutException: Failed to send request
after 3 ms.. (org.apache.kafka.clients.FetchSessionHandler)

and one

ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread -
connect-offset-storage-topic,5,main]
(org.apache.kafka.connect.util.KafkaBasedLog)».

So even if broker is restarted, consumer already failed, and connectors are
not able to get offsets. It seems, that there is no relation to specific
broker instance, due to this error might occur with different brokers.


What can cause these problem?

Thanks.


[jira] [Created] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart

2019-03-25 Thread Michael Melsen (JIRA)
Michael Melsen created KAFKA-8153:
-

 Summary: Streaming application with state stores takes up to 1 
hour to restart
 Key: KAFKA-8153
 URL: https://issues.apache.org/jira/browse/KAFKA-8153
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.1
Reporter: Michael Melsen


We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the 
InteractiveQueryService to fetch data from the stores. There are 4 stores that 
persist data on disk after aggregating data. The code for the topology looks 
like this:
{code:java}
@Slf4j
@EnableBinding(SensorMeasurementBinding.class)
public class Consumer {

  public static final String RETENTION_MS = "retention.ms";
  public static final String CLEANUP_POLICY = "cleanup.policy";

  @Value("${windowstore.retention.ms}")
  private String retention;

/**
 * Process the data flowing in from a Kafka topic. Aggregate the data to:
 * - 2 minute
 * - 15 minutes
 * - one hour
 * - 12 hours
 *
 * @param stream
 */
@StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
public void process(KStream stream) {

Map topicConfig = new HashMap<>();
topicConfig.put(RETENTION_MS, retention);
topicConfig.put(CLEANUP_POLICY, "delete");

log.info("Changelog and local window store retention.ms: {} and 
cleanup.policy: {}",
topicConfig.get(RETENTION_MS),
topicConfig.get(CLEANUP_POLICY));

createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
  }

  private void createWindowStore(
LocalStore localStore,
Map topicConfig,
KStream stream) {

// Configure how the statestore should be materialized using the provide 
storeName
Materialized> materialized = 
Materialized
.as(localStore.getStoreName());

// Set retention of changelog topic
materialized.withLoggingEnabled(topicConfig);

// Configure how windows looks like and how long data will be retained in 
local stores
TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
localStore.getTimeUnit(), 
Long.parseLong(topicConfig.get(RETENTION_MS)));

// Processing description:
// The input data are 'samples' with key 
:::
// 1. With the map we add the Tag to the key and we extract the error score 
from the data
// 2. With the groupByKey we group  the data on the new key
// 3. With windowedBy we split up the data in time intervals depending on 
the provided LocalStore enum
// 4. With reduce we determine the maximum value in the time window
// 5. Materialized will make it stored in a table
stream
.map(getInstallationAssetModelAlgorithmTagKeyMapper())
.groupByKey()
.windowedBy(configuredTimeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
newValue), materialized);
  }

  private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long 
retentionMs) {
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
return timeWindows;
  }

  /**
   * Determine the max error score to keep by looking at the aggregated error 
signal and
   * freshly consumed error signal
   *
   * @param aggValue
   * @param newValue
   * @return
   */
  private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore newValue) 
{
if(aggValue.getErrorSignal() > newValue.getErrorSignal()) {
return aggValue;
}
return newValue;
  }

  private KeyValueMapper> 
getInstallationAssetModelAlgorithmTagKeyMapper() {
return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + 
sensorMeasurement.getT(),
new ErrorScore(sensorMeasurement.getTs(), sensorMeasurement.getE(), 
sensorMeasurement.getO()));
  }
}
{code}
So we are materializing aggregated data to four different stores after 
determining the max value within a specific window for a specific key. Please 
note that retention which is set to two months of data and the clean up policy 
delete. We don't compact data.

The size of the individual state stores on disk is between 14 to 20 gb of data.

We are making use of Interactive Queries: 
[https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#interactive-queries]

On our setup we have 4 instances of our streaming app to be used as one 
consumer group. So every instance will store a specific part of all data in its 
store.

This all seems to work nicely. Until we restart one or more instances and wait 
for it to become available again. (Restart time only is about 3 minutes max). I 
would expect that the restart of the app would not take that long but 
unfortunately it