Kafka consumer issue

2019-01-07 Thread Vikas Talegaonkar
@Team,
 I am trying to connect and consume from remote kafka, i am able to get number 
of partitions but when tried to consume from it not getting any data either 
start consuming from beginning or from latest. We have multiple topic in this 
kafka cluster, and there are consumer running for other kafka topic, and this 
is the first time we are trying to consume from this topic

I do see following in logs

INFO  2019-01-08 04:51:20,266 [Thread-30]???[AbstractCoordinator.java:657] : 
[Consumer clientId=consumer-30, groupId=dev22-icc-repo-aggregator] Group 
coordinator dev22-kafka2:9092 (id: 2147483645 rack: null) is unavailable or 
invalid, will attempt rediscovery

I did google and do see one way to resolve this is to do clean up on topic, not 
sure if thats the right and only approach. Will you please provide some detail 
on issue and how to resolve it. 


Regards
Vikas

Build failed in Jenkins: kafka-trunk-jdk11 #188

2019-01-07 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7768: Use absolute paths for javadoc (#6100)

--
[...truncated 2.25 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED


Build failed in Jenkins: kafka-trunk-jdk8 #3291

2019-01-07 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7051: Improve the efficiency of ReplicaManager (#5206)

[cmccabe] KAFKA-7051: Improve the efficiency of ReplicaManager (fixup)

[wangguoz] KAFKA-7768: Use absolute paths for javadoc (#6100)

--
[...truncated 2.25 MB...]

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldNotDeleteCheckpointFileAfterLoaded STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldNotDeleteCheckpointFileAfterLoaded PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldInitializeStateStores STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldInitializeStateStores PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldSkipNullKeysWhenRestoring STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldSkipNullKeysWhenRestoring PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldReturnInitializedStoreNames STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldReturnInitializedStoreNames PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldCheckpointRestoredOffsetsToFile STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldCheckpointRestoredOffsetsToFile PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldUnlockGlobalStateDirectoryOnClose STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldUnlockGlobalStateDirectoryOnClose PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldCheckpointOffsets STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldCheckpointOffsets PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldAttemptToCloseAllStoresEvenWhenSomeException STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldAttemptToCloseAllStoresEvenWhenSomeException PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowIllegalArgumentExceptionIfCallbackIsNull STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowIllegalArgumentExceptionIfCallbackIsNull PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldSkipGlobalInMemoryStoreOffsetsToFile STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldSkipGlobalInMemoryStoreOffsetsToFile PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldReadCheckpointOffsets STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldReadCheckpointOffsets PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldCloseStateStores STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldCloseStateStores PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldFlushStateStores STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldFlushStateStores PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldLockGlobalStateDirectory STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldLockGlobalStateDirectory PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldRestoreRecordsFromCheckpointToHighwatermark STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldRestoreRecordsFromCheckpointToHighwatermark PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice STARTED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice PASSED

org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest > 
shouldReleaseLockIfExceptionWhenLoadingCheckpoints STARTED


Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-01-07 Thread Guozhang Wang
Thanks Jason. The proposed solution sounds good to me.


Guozhang

On Mon, Jan 7, 2019 at 3:52 PM Jason Gustafson  wrote:

> Hey Guozhang,
>
> Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
> occurs following expiration of the producerId. It's possible that another
> producerId has been installed in its place following expiration (if another
> producer instance has become active), or the mapping is empty. We can
> safely retry the InitProducerId with the logic in this KIP in order to
> detect which case it is. So I'd suggest something like this:
>
> 1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
> InitProducerId using the current producerId and epoch.
> 2. If no mapping exists, the coordinator can generate a new producerId and
> return it. If a transaction is in progress on the client, it will have to
> be aborted, but the producer can continue afterwards.
> 3. Otherwise if a different producerId has been assigned, then we can
> return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
> probably raise this as ProducerFencedException since that is effectively
> what has happened. Ideally this is the only fatal case that users have to
> handle.
>
> I'll give it a little more thought and update the KIP.
>
> Thanks,
> Jason
>
> On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang  wrote:
>
> > You're right about the dangling txn since it will actually block
> > read-committed consumers from proceeding at all. I'd agree that since
> this
> > is a very rare case, we can consider fixing it not via broker-side logic
> > but via tooling in a future work.
> >
> > I've also discovered some related error handling logic inside producer
> that
> > may be addressed together with this KIP (since it is mostly for internal
> > implementations the wiki itself does not need to be modified):
> >
> >
> >
> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
> >
> > Guozhang
> >
> >
> >
> > On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson 
> > wrote:
> >
> > > Hey Guozhang,
> > >
> > > To clarify, the broker does not actually use the ApiVersion API for
> > > inter-broker communications. The use of an API and its corresponding
> > > version is controlled by `inter.broker.protocol.version`.
> > >
> > > Nevertheless, it sounds like we're on the same page about removing
> > > DescribeTransactionState. The impact of a dangling transaction is a
> > little
> > > worse than what you describe though. Consumers with the read_committed
> > > isolation level will be stuck. Still, I think we agree that this case
> > > should be rare and we can reconsider for future work. Rather than
> > > preventing dangling transactions, perhaps we should consider options
> > which
> > > allows us to detect them and recover. Anyway, this needs more thought.
> I
> > > will update the KIP.
> > >
> > > Best,
> > > Jason
> > >
> > > On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang 
> > wrote:
> > >
> > > > 0. My original question is about the implementation details
> primarily,
> > > > since current the handling logic of the APIVersionResponse is simply
> > "use
> > > > the highest supported version of the corresponding request", but if
> the
> > > > returned response from APIVersionRequest says "I don't even know
> about
> > > the
> > > > DescribeTransactionStateRequest at all", then we need additional
> logic
> > > for
> > > > the falling back logic. Currently this logic is embedded in
> > NetworkClient
> > > > which is shared by all clients, so I'd like to avoid making this
> logic
> > > more
> > > > complicated.
> > > >
> > > > As for the general issue that a broker does not recognize a producer
> > with
> > > > sequence number 0, here's my thinking: as you mentioned in the wiki,
> > this
> > > > is only a concern for transactional producer since for idempotent
> > > producer
> > > > it can just bump the epoch and go. For transactional producer, even
> if
> > > the
> > > > producer request from a fenced producer gets accepted, its
> transaction
> > > will
> > > > never be committed and hence messages not exposed to read-committed
> > > > consumers as well. The drawback is though, 1) read-uncommitted
> > consumers
> > > > will still read those messages, 2) unnecessary storage for those
> fenced
> > > > produce messages, but in practice should not accumulate to a large
> > amount
> > > > since producer should soon try to commit and be told it is fenced and
> > > then
> > > > stop, 3) there will be no markers for those transactional messages
> > ever.
> > > > Looking at the list and thinking about the likelihood it may happen
> > > > assuming we retain the producer up to transactional.id.timeout
> (default
> > > is
> > > > 7 days), I feel comfortable leaving it as is.
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Nov 26, 2018 at 6:09 PM Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Hey Guozhang,
> > > > >
> > > > > Thanks for the 

Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-01-07 Thread Jason Gustafson
Hey Guozhang,

Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
occurs following expiration of the producerId. It's possible that another
producerId has been installed in its place following expiration (if another
producer instance has become active), or the mapping is empty. We can
safely retry the InitProducerId with the logic in this KIP in order to
detect which case it is. So I'd suggest something like this:

1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
InitProducerId using the current producerId and epoch.
2. If no mapping exists, the coordinator can generate a new producerId and
return it. If a transaction is in progress on the client, it will have to
be aborted, but the producer can continue afterwards.
3. Otherwise if a different producerId has been assigned, then we can
return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
probably raise this as ProducerFencedException since that is effectively
what has happened. Ideally this is the only fatal case that users have to
handle.

I'll give it a little more thought and update the KIP.

Thanks,
Jason

On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang  wrote:

> You're right about the dangling txn since it will actually block
> read-committed consumers from proceeding at all. I'd agree that since this
> is a very rare case, we can consider fixing it not via broker-side logic
> but via tooling in a future work.
>
> I've also discovered some related error handling logic inside producer that
> may be addressed together with this KIP (since it is mostly for internal
> implementations the wiki itself does not need to be modified):
>
>
> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
>
> Guozhang
>
>
>
> On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson 
> wrote:
>
> > Hey Guozhang,
> >
> > To clarify, the broker does not actually use the ApiVersion API for
> > inter-broker communications. The use of an API and its corresponding
> > version is controlled by `inter.broker.protocol.version`.
> >
> > Nevertheless, it sounds like we're on the same page about removing
> > DescribeTransactionState. The impact of a dangling transaction is a
> little
> > worse than what you describe though. Consumers with the read_committed
> > isolation level will be stuck. Still, I think we agree that this case
> > should be rare and we can reconsider for future work. Rather than
> > preventing dangling transactions, perhaps we should consider options
> which
> > allows us to detect them and recover. Anyway, this needs more thought. I
> > will update the KIP.
> >
> > Best,
> > Jason
> >
> > On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang 
> wrote:
> >
> > > 0. My original question is about the implementation details primarily,
> > > since current the handling logic of the APIVersionResponse is simply
> "use
> > > the highest supported version of the corresponding request", but if the
> > > returned response from APIVersionRequest says "I don't even know about
> > the
> > > DescribeTransactionStateRequest at all", then we need additional logic
> > for
> > > the falling back logic. Currently this logic is embedded in
> NetworkClient
> > > which is shared by all clients, so I'd like to avoid making this logic
> > more
> > > complicated.
> > >
> > > As for the general issue that a broker does not recognize a producer
> with
> > > sequence number 0, here's my thinking: as you mentioned in the wiki,
> this
> > > is only a concern for transactional producer since for idempotent
> > producer
> > > it can just bump the epoch and go. For transactional producer, even if
> > the
> > > producer request from a fenced producer gets accepted, its transaction
> > will
> > > never be committed and hence messages not exposed to read-committed
> > > consumers as well. The drawback is though, 1) read-uncommitted
> consumers
> > > will still read those messages, 2) unnecessary storage for those fenced
> > > produce messages, but in practice should not accumulate to a large
> amount
> > > since producer should soon try to commit and be told it is fenced and
> > then
> > > stop, 3) there will be no markers for those transactional messages
> ever.
> > > Looking at the list and thinking about the likelihood it may happen
> > > assuming we retain the producer up to transactional.id.timeout (default
> > is
> > > 7 days), I feel comfortable leaving it as is.
> > >
> > > Guozhang
> > >
> > > On Mon, Nov 26, 2018 at 6:09 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > Thanks for the comments. Responses below:
> > > >
> > > > 0. The new API is used between brokers, so we govern its usage using
> > > > `inter.broker.protocol.version`. If the other broker hasn't upgraded,
> > we
> > > > will just fallback to the old logic, which is to accept the write.
> This
> > > is
> > > > similar to how we introduced the OffsetsForLeaderEpoch API. Does that
> > > seem

Re: [VOTE] KIP-382 MirrorMaker 2.0

2019-01-07 Thread Ryanne Dolan
Thanks Jun, I've updated the KIP as requested. Brief notes below:

100. added "...out-of-the-box (without custom handlers)..."

101. done. Good idea to include a MessageFormatter.

102. done.

> 103. [...] why is Heartbeat a separate connector?

Heartbeats themselves are replicated via MirrorSource/SinkConnector, so if
replication stops, you'll stop seeing heartbeats in downstream clusters.
I've updated the KIP to make this clearer and have added a bullet to
Rejected Alternatives.

104. added "heartbeat.retention.ms", "checkpoint.retention.ms", thanks. The
heartbeat topic doesn't need to be compacted.

> 105. [...] I am not sure why targetClusterAlias is useful

In order to map A's B.topic1 to B's topic1, we need to know B.

> 106. [...] should the following properties be prefixed with "consumer."

No, they are part of Connect's worker config.

> 107. So, essentially it's running multiple logical connect clusters on
the same shared worker nodes?

Correct. Rather than configure each Connector and Worker and Herder
individually, a single top-level configuration file is used. And instead of
running a bunch of separate worker processes on each node, a single process
runs multiple workers. This is implemented using a separate driver based on
ConnectDistributed, but which runs multiple DistributedHerders. Each
DistributedHerder uses a different Kafka cluster for coordination -- they
are completely separate apart from running in the same process.

Thanks for helping improve the doc!
Ryanne

On Fri, Jan 4, 2019 at 10:33 AM Jun Rao  wrote:

> Hi, Ryanne,
>
> Thanks for KIP.  Still have a few more comments below.
>
> 100. "This is not possible with MirrorMaker today -- records would be
> replicated back and forth indefinitely, and the topics in either cluster
> would be merged inconsistently between clusters. " This is not 100% true
> since MM can do the topic renaming through MirrorMakerMessageHandler.
>
> 101. For both Heartbeat and checkpoint, could you define the full schema,
> including the field type? Also how are they serialized into the Kafka
> topic? Is it JSON or sth else? For convenience, it would be useful to
> provide a built-in MessageFormatter so that one can read each topic's data
> using tools like ConsoleConsumer.
>
> 102. For the public Heartbeat and Checkpoint class, could you list the
> public methods in each class?
>
> 103. I am wondering why is Heartbeat a separate connector? A MirrorMaker
> connector can die independent of the Heartbeat connector, which seems to
> defeat the purpose of heartbeat.
>
> 104. Is the Heartbeat topic also a compacted topic? If not, how long is it
> retained for?
>
> 105. For the following, I am not sure why targetClusterAlias is useful? The
> checkpoint file seems to only include sourceClusterAlias.
>
> Map translateOffsets(Map targetConsumerConfig,
> String sourceClusterAlias, String targetClusterAlias, String remoteGroupId)
>
> 106. In the configuration example, should the following properties be
> prefixed with "consumer."?
> key.converter
>  =
> org.apache.kafka.connect.converters.ByteArrayConverter
> <
> https://cwiki.apache.org/confluence/display/KAFKA/org.apache.kafka.connect.converters.ByteArrayConverter
> >
> value.converter
>  =
> org.apache.kafka.connect.converters.ByteArrayConverter
> <
> https://cwiki.apache.org/confluence/display/KAFKA/org.apache.kafka.connect.converters.ByteArrayConverter
> >
>
> 107. Could you add a bit more description on how connect-mirror-maker.sh is
> implemented? My understanding is that it will start as many as
> separate DistributedHerder as the Kafka clusters? So, essentially it's
> running multiple logical connect clusters on the same shared worker nodes?
>
> Thanks,
>
> Jun
>
>
> On Thu, Dec 20, 2018 at 5:23 PM Srinivas Reddy  >
> wrote:
>
> > +1 (non binding)
> >
> > Thank you Ryan for the KIP, let me know if you need support in
> implementing
> > it.
> >
> > -
> > Srinivas
> >
> > - Typed on tiny keys. pls ignore typos.{mobile app}
> >
> >
> > On Fri, 21 Dec, 2018, 08:26 Ryanne Dolan  >
> > > Thanks for the votes so far!
> > >
> > > Due to recent discussions, I've removed the high-level REST API from
> the
> > > KIP.
> > >
> > > On Thu, Dec 20, 2018 at 12:42 PM Paul Davidson <
> pdavid...@salesforce.com
> > >
> > > wrote:
> > >
> > > > +1
> > > >
> > > > Would be great to see the community build on the basic approach we
> took
> > > > with Mirus. Thanks Ryanne.
> > > >
> > > > On Thu, Dec 20, 2018 at 9:01 AM Andrew Psaltis <
> > psaltis.and...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > +1
> > > > >
> > > > > Really looking forward to this and to helping in any way I can.
> > Thanks
> > > > for
> > > > > kicking this off Ryanne.
> > > > >
> > > > > On Thu, Dec 20, 2018 at 10:18 PM Andrew Otto 
> > > wrote:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > This looks like a huge 

Build failed in Jenkins: kafka-trunk-jdk11 #187

2019-01-07 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7051: Improve the efficiency of ReplicaManager (#5206)

[cmccabe] KAFKA-7051: Improve the efficiency of ReplicaManager (fixup)

--
[...truncated 2.25 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

Build failed in Jenkins: kafka-trunk-jdk8 #3290

2019-01-07 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7768: Add version to java html urls (#6094)

--
[...truncated 4.50 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-01-07 Thread Jan Filipiak


On 02.01.2019 23:44, John Roesler wrote:
> However, you seem to have a strong intuition that the scatter/gather
> approach is better.
> Is this informed by your actual applications at work? Perhaps you can
> provide an example
> data set and sequence of operations so we can all do the math and agree
> with you.
> It seems like we should have a convincing efficiency argument before
> choosing a more
> complicated API over a simpler one.

The way I see this is simple. If we only provide the basic 
implementation of 1:n join (repartition by FK, Range scan on Foreign 
table update). Then this is such a fundamental building block.

I do A join B.

a.map(retain(key and FK)).tomanyJoin(B).groupBy(a.key()).join(A). This 
pretty much performs all your "wire saving optimisations". I don't know! 
to be honest if someone did put this ContextAwareMapper() that was 
discussed at some point. Then I could actually do the high watermark 
thing. a.contextMap(reatain(key, fk and offset). 
omanyJoin(B).aggregate(a.key(), oldest offset wins).join(A).
I don't find the KIP though. I guess it didn't make it.

After the repartition and the range read the abstraction just becomes to 
weak. I just showed that your implementation is my implementation with 
stuff around it.

I don't know if your scatter gather thing is in code somewhere. If the 
join will only be applied after the gather phase I really wonder where 
we get the other record from? do you also persist the foreign table on 
the original side? If that is put into code somewhere already?

This would essentially bring B to each of the A's tasks. Factors for 
this in my case a rather easy and dramatic. Nevertheless an approach I 
would appreciate. In Hive this could be something closely be related to 
the concept of a MapJoin. Something I whish we had in streams. I often 
stated that at some point we need unbounded ammount off offsets per 
topicpartition and group :D So good.

Long story short. I hope you can follow my line of thought. I hope you 
can clarify my missunderstanding how the join is performed on A side 
without materializing B there.

I would love if streams would get it right. The basic rule I always say 
is do what Hive does. done.


>
> Last thought:
>> Regarding what will be observed. I consider it a plus that all events
>> that are in the inputs have an respective output. Whereas your solution
>> might "swallow" events.
>
> I didn't follow this. Following Adam's example, we have two join results: a
> "dead" one and
> a "live" one. If we get the dead one first, both solutions emit it,
> followed by the live result.

there might be multiple dead once in flight right? But it doesn't really 
matter, I never did something with the extra benefit i mentioned.


Re: Contributor Add Request

2019-01-07 Thread Jun Rao
Hi, Radwa.

Thanks for your interest. Added you to the contributors list.

Jun

On Mon, Jan 7, 2019 at 1:49 PM Radwa Osama  wrote:

> Hello All,
>
> Would you please add me to contributors list?
>
> My jira username is: RO86
>
> Thanks
> Radwa
>


[jira] [Created] (KAFKA-7793) Improve the Trogdor command-line

2019-01-07 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-7793:
--

 Summary: Improve the Trogdor command-line
 Key: KAFKA-7793
 URL: https://issues.apache.org/jira/browse/KAFKA-7793
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin P. McCabe


Improve the Trogdor command-line.  It should be easier to launch tasks from a 
task spec in a file.  It should be easier to list the currently-running tasks 
in a readable way.  We should be able to filter the currently-running tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Contributor Add Request

2019-01-07 Thread Radwa Osama
Hello All,

Would you please add me to contributors list?

My jira username is: RO86

Thanks
Radwa


Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2019-01-07 Thread Edoardo Comar
Hi,
I delayed starting the voting thread due to the festive period. I would 
like to start it this week.
Has anyone any more feedback ?

--

Edoardo Comar

IBM Event Streams


Edoardo Comar  wrote on 13/12/2018 17:50:30:

> From: Edoardo Comar 
> To: dev@kafka.apache.org
> Date: 13/12/2018 17:50
> Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for 
> Cluster Replication
> 
> Hi,
> as we haven't got any more feedback, we'd like to start a vote on 
KIP-391 
> on Monday
> 
> INVALID URI REMOVED
> 
u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D391-253A-2BAllow-2BProducing-2Bwith-2BOffsets-2Bfor-2BCluster-2BReplication=DwIFAg=jf_iaSHvJObTbx-
> 
siA1ZOg=EzRhmSah4IHsUZVekRUIINhltZK7U0OaeRo7hgW4_tQ=hxekG7cvm8Peoyd4oPqvSwRFRuGIyi9Pc_h2GhHbgtw=4SGyJsJAuYWZWADpzAaSEPqzYnde0WRW6XgZ3L4haB4=
> 
> --
> 
> Edoardo Comar
> 
> IBM Event Streams
> IBM UK Ltd, Hursley Park, SO21 2JN
> 
> 
> Edoardo Comar/UK/IBM wrote on 10/12/2018 10:20:06:
> 
> > From: Edoardo Comar/UK/IBM
> > To: dev@kafka.apache.org
> > Date: 10/12/2018 10:20
> > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for 
> > Cluster Replication
> > 
> > (shameless bump) any additional feedback is welcome ... thanks!
> > 
> > Edoardo Comar  wrote on 27/11/2018 15:35:09:
> > 
> > > From: Edoardo Comar 
> > > To: dev@kafka.apache.org
> > > Date: 27/11/2018 15:35
> > > Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for 
> > > Cluster Replication
> > > 
> > > Hi Jason
> > > 
> > > we envisioned the replicator to replicate the __consumer_offsets 
topic 
> too 
> > > (although without producing-with-offsets to it!).
> > > 
> > > As there is no client-side implementation yet using the leader 
epoch, 
> > > we could not yet see the impact of writing to the destination 
cluster 
> > > __consumer_offsets records with an invalid leader epoch.
> > > 
> > > Also, applications might still use external storage mechanism for 
> consumer 
> > > offsets where the leader_epoch is missing.
> > > 
> > > Perhaps the replicator could - for the __consumer_offsets topic - 
just 
> 
> > > omit the leader_epoch field in the data sent to destination.
> > > 
> > > What do you think ?
> > > 
> > > 
> > > Jason Gustafson  wrote on 27/11/2018 00:09:56:
> > > 
> > > > Another wrinkle to consider is KIP-320. If you are planning to 
> replicate
> > > > __consumer_offsets directly, then you will have to account for 
> leader 
> > > epoch
> > > > information which is stored with the committed offsets. But I 
cannot 
> 
> > > think
> > > > how it would be possible to replicate the leader epoch information 

> in
> > > > messages even if you can preserve offsets.
> > > > 
> > > > -Jason
> > > > 
> > > > On Mon, Nov 26, 2018 at 1:16 PM Mayuresh Gharat 
> > > 
> > > > wrote:
> > > > 
> > > > > Hi Edoardo,
> > > > >
> > > > > Thanks a lot for the KIP.
> > > > >  I have a few questions/suggestions in addition to what Radai 
has 
> > > mentioned
> > > > > above :
> > > > >
> > > > >1. Is this meant only for 1:1 replication, for example one 
> Kafka 
> > > cluster
> > > > >replicating to other, instead of having multiple Kafka 
clusters
> > > > > mirroring
> > > > >into one Kafka cluster?
> > > > >2. Are we relying on exactly once produce in the replicator? 
If 
> 
> > > not, how
> > > > >are retries handled in the replicator ?
> > > > >3. What is the recommended value for inflight requests, here. 

> Is it
> > > > >suppose to be strictly 1, if yes, it would be great to 
mention 
> that 
> > > in
> > > > > the
> > > > >KIP.
> > > > >4. How is unclean Leader election between source cluster and 
> > > destination
> > > > >cluster handled?
> > > > >5. How are offsets resets in case of the replicator's 
consumer 
> > > handled?
> > > > >6. It would be good to explain the workflow in the KIP, with 
an
> > > > >example,  regarding how this KIP will change the replication 
> > > scenario
> > > > > and
> > > > >how it will benefit the consumer apps.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > > On Mon, Nov 26, 2018 at 8:08 AM radai 
 
> 
> > > wrote:
> > > > >
> > > > > > a few questions:
> > > > > >
> > > > > > 1. how do you handle possible duplications caused by the 
> "special"
> > > > > > producer timing-out/retrying? are you explicitely relying on 
the
> > > > > > "exactly once" sequencing?
> > > > > > 2. what about the combination of log compacted topics + 
> replicator
> > > > > > downtime? by the time the replicator comes back up there might 

> be
> > > > > > "holes" in the source offsets (some msgs might have been 
> compacted
> > > > > > out)? how is that recoverable?
> > > > > > 3. similarly, what if you try and fire up replication on a 
> non-empty
> > > > > > source topic? does the kip allow for offsets starting at some
> > > > > > arbitrary X > 0 ? or would this 

[jira] [Resolved] (KAFKA-4218) Enable access to key in ValueTransformer

2019-01-07 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4218.
--
   Resolution: Fixed
Fix Version/s: (was: 1.1.0)
   2.1.0

This has been resolved by 
https://github.com/apache/kafka/commit/bcc712b45820da74b44209857ebbf7b9d59e0ed7 
from [~jeyhunkarimov]


> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: api, kip
> Fix For: 2.1.0
>
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.
> KIP-149: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores

2019-01-07 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4468.
--
Resolution: Fixed
  Assignee: Richard Yu

> Correctly calculate the window end timestamp after read from state stores
> -
>
> Key: KAFKA-4468
> URL: https://issues.apache.org/jira/browse/KAFKA-4468
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Richard Yu
>Priority: Major
>  Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the 
> start timestamp of the window as part of the combo-key as (start-timestamp, 
> key). The reason that we do not add the end-timestamp as well is that we can 
> always calculate it from the start timestamp + window_length, and hence we 
> can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end 
> timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix 
> this by calculating its end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2019-01-07 Thread Boyang Chen
Hey Stanislav,

I think the time taken to rebalance is not linearly correlated with number of 
consumers with our application. As for our current and future use cases,
the main concern for Pinterest is still on the broker memory not CPU, because 
crashing server by one application could have cascading effect on all jobs.
Do you want to drive a more detailed formula on how to compute the memory 
consumption against number of consumers within the group?

In the meantime, I'm pretty buying in the motivation of this KIP, so I think 
the follow-up work is just refinement to make the new config easy to use. We 
should be good
to vote IMO.

Best,
Boyang

From: Stanislav Kozlovski 
Sent: Monday, January 7, 2019 4:21 PM
To: dev@kafka.apache.org
Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata 
growth

Hey there,

Per Gwen's comments, I slightly reworked the motivation section. Let me
know if it's any better now

I completely agree that it would be best if we were to add a recommended
number to a typical consumer group size. There is a problem that timing the
CPU usage and rebalance times of consumer groups is tricky. We can update
the KIP with memory guidelines (e.g 1 consumer in a group uses X memory,
therefore 100 use Y).
I fear that the most useful recommendations though would be knowing the CPU
impact of large consumer groups and the rebalance times. That is,
unfortunately, tricky to test and measure.

@Boyang, you had mentioned some numbers used in Pinterest. If available to
you, would you be comfortable sharing the number of consumers you are using
in a group and maybe the potential time it takes to rebalance it?

I'd appreciate any anecdotes regarding consumer group sizes from the
community

Best,
Stanislav

On Thu, Jan 3, 2019 at 1:59 AM Boyang Chen  wrote:

> Thanks Gwen for the suggestion! +1 on the guidance of defining
> group.max.size. I guess a sample formula would be:
> 2 * (# of brokers * average metadata cache size * 80%) / (# of consumer
> groups * size of a single member metadata)
>
> if we assumed non-skewed partition assignment and pretty fair consumer
> group consumption. The "2" is the 95 percentile of normal distribution and
> 80% is just to buffer some memory capacity which are both open to
> discussion. This config should be useful for Kafka platform team to make
> sure one extreme large consumer group won't bring down the whole cluster.
>
> What do you think?
>
> Best,
> Boyang
>
> 
> From: Gwen Shapira 
> Sent: Thursday, January 3, 2019 2:59 AM
> To: dev
> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> metadata growth
>
> Sorry for joining the fun late, but I think the problem we are solving
> evolved a bit in the thread, and I'd like to have better understanding
> of the problem before voting :)
>
> Both KIP and discussion assert that large groups are a problem, but
> they are kinda inconsistent regarding why they are a problem and whose
> problem they are...
> 1. The KIP itself states that the main issue with large groups are
> long rebalance times. Per my understanding, this is mostly a problem
> for the application that consumes data, but not really a problem for
> the brokers themselves, so broker admins probably don't and shouldn't
> care about it. Also, my understanding is that this is a problem for
> consumer groups, but not necessarily a problem for other group types.
> 2. The discussion highlights the issue of "run away" groups that
> essentially create tons of members needlessly and use up lots of
> broker memory. This is something the broker admins will care about a
> lot. And is also a problem for every group that uses coordinators and
> not just consumers. And since the memory in question is the metadata
> cache, it probably has the largest impact on Kafka Streams
> applications, since they have lots of metadata.
>
> The solution proposed makes the most sense in the context of #2, so
> perhaps we should update the motivation section of the KIP to reflect
> that.
>
> The reason I'm probing here is that in my opinion we have to give our
> users some guidelines on what a reasonable limit is (otherwise, how
> will they know?). Calculating the impact of group-size on rebalance
> time in order to make good recommendations will take a significant
> effort. On the other hand, informing users regarding the memory
> footprint of a consumer in a group and using that to make a reasonable
> suggestion isn't hard.
>
> Gwen
>
>
> On Sun, Dec 30, 2018 at 12:51 PM Stanislav Kozlovski
>  wrote:
> >
> > Thanks Boyang,
> >
> > If there aren't any more thoughts on the KIP I'll start a vote thread in
> > the new year
> >
> > On Sat, Dec 29, 2018 at 12:58 AM Boyang Chen 
> wrote:
> >
> > > Yep Stanislav, that's what I'm proposing, and your explanation makes
> sense.
> > >
> > > Boyang
> > >
> > > 
> > > From: Stanislav Kozlovski 
> > > Sent: Friday, 

Build failed in Jenkins: kafka-2.1-jdk8 #97

2019-01-07 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7768: Add version to java html urls (#6094)

--
[...truncated 462.06 KB...]

kafka.security.auth.SimpleAclAuthorizerTest > testAddAclsOnWildcardResource 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAddAclsOnWildcardResource 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2 STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2 PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2 STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2 PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDeleteAclOnPrefixedResource 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDeleteAclOnPrefixedResource 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSingleCharacterResourceAcls 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSingleCharacterResourceAcls 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testEmptyAclThrowsException 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testEmptyAclThrowsException PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testSuperUserWithCustomPrincipalHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testSuperUserWithCustomPrincipalHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAllowAccessWithCustomPrincipal STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAllowAccessWithCustomPrincipal PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDeleteAclOnWildcardResource 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDeleteAclOnWildcardResource 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testChangeListenerTiming STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testChangeListenerTiming PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions
 STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions
 PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAccessAllowedIfAllowAclExistsOnPrefixedResource STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAccessAllowedIfAllowAclExistsOnPrefixedResource PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAuthorizeWithEmptyResourceName STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAuthorizeWithEmptyResourceName PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDeleteAllAclOnPrefixedResource STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDeleteAllAclOnPrefixedResource PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAddAclsOnLiteralResource 
STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAddAclsOnLiteralResource 
PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAuthorizeThrowsOnNoneLiteralResource STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testAuthorizeThrowsOnNoneLiteralResource PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testGetAclsPrincipal STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testGetAclsPrincipal PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAddAclsOnPrefiexedResource 
STARTED


[jira] [Created] (KAFKA-7792) Trogdor should have an uptime function

2019-01-07 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-7792:
--

 Summary: Trogdor should have an uptime function
 Key: KAFKA-7792
 URL: https://issues.apache.org/jira/browse/KAFKA-7792
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


Trogdor should have an uptime function which returns how long the coordinator 
or agent has been up.  This will also be a good way to test that the daemon is 
running without fetching a full status.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7755) Kubernetes - Kafka clients are resolving DNS entries only one time

2019-01-07 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7755.
---
   Resolution: Fixed
Fix Version/s: 2.1.1
   2.2.0

> Kubernetes - Kafka clients are resolving DNS entries only one time
> --
>
> Key: KAFKA-7755
> URL: https://issues.apache.org/jira/browse/KAFKA-7755
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
> Environment: Kubernetes
>Reporter: Loïc Monney
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: pom.xml
>
>
> *Introduction*
>  Since 2.1.0 Kafka clients are supporting multiple DNS resolved IP addresses 
> if the first one fails. This change has been introduced by 
> https://issues.apache.org/jira/browse/KAFKA-6863. However this DNS resolution 
> is now performed only one time by the clients. This is not a problem if all 
> brokers have fixed IP addresses, however this is definitely an issue when 
> Kafka brokers are run on top of Kubernetes. Indeed, new Kubernetes pods will 
> receive another IP address, so as soon as all brokers will have been 
> restarted clients won't be able to reconnect to any broker.
> *Impact*
>  Everyone running Kafka 2.1 or later on top of Kubernetes is impacted when a 
> rolling restart is performed.
> *Root cause*
>  Since https://issues.apache.org/jira/browse/KAFKA-6863 Kafka clients are 
> resolving DNS entries only once.
> *Proposed solution*
>  In 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L368]
>  Kafka clients should perform the DNS resolution again when all IP addresses 
> have been "used" (when _index_ is back to 0)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-01-07 Thread Paul Davidson
Thanks all. I've submitted a new PR with a possible implementation:
https://github.com/apache/kafka/pull/6097. Note I did not include the group
ID as part of the default client ID, mainly to avoid the connector name
appearing twice by default. As noted in the original Jira (
https://issues.apache.org/jira/browse/KAFKA-5061), leaving out the group ID
could lead to naming conflicts if multiple clusters run the same Kafka
cluster. This would probably not be a problem for many (including us) as
metrics exporters can usually be configured to include a cluster ID and
guarantee uniqueness. Will be interested to hear your thoughts on this.

Paul



On Mon, Jan 7, 2019 at 10:27 AM Ryanne Dolan  wrote:

> I'd also prefer to avoid the new configuration property if possible. Seems
> like a lighter touch without it.
>
> Ryanne
>
> On Sun, Jan 6, 2019 at 7:25 PM Paul Davidson 
> wrote:
>
> > Hi Konstantine,
> >
> > Thanks for your feedback!  I think my reply to Ewen covers most of your
> > points, and I mostly agree.  If there is general agreement that changing
> > the default behavior is preferable to a config change I will update my PR
> > to use  that approach.
> >
> > Paul
> >
> > On Fri, Jan 4, 2019 at 5:55 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Hi Paul.
> > >
> > > I second Ewen and I intended to give similar feedback:
> > >
> > > 1) Can we avoid a config altogether?
> > > 2) If we prefer to add a config anyways, can we use a set of allowed
> > values
> > > instead of a boolean, even if initially these values are only two? As
> the
> > > discussion on Jira highlights, there is a potential for more naming
> > > conventions in the future, even if now the extra functionality doesn't
> > seem
> > > essential. It's not optimal to have to deprecate a config instead of
> just
> > > extending its set of values.
> > > 3) I agree, the config name sounds too general. How about
> > > "client.ids.naming.policy" or "client.ids.naming" if you want two more
> > > options?
> > >
> > > Konstantine
> > >
> > > On Fri, Jan 4, 2019 at 7:38 AM Ewen Cheslack-Postava <
> e...@confluent.io>
> > > wrote:
> > >
> > > > Hi Paul,
> > > >
> > > > Thanks for the KIP. A few comments.
> > > >
> > > > To me, biggest question here is if we can fix this behavior without
> > > adding
> > > > a config. In particular, today, we don't even set the client.id for
> > the
> > > > producer and consumer at all, right? The *only* way it is set is if
> you
> > > > include an override in the worker config, but in that case you need
> to
> > be
> > > > explicitly opting in with a `producer.` or `consumer.` prefix, i.e.
> the
> > > > settings are `producer.client.id` and `consumer.client.id`.
> > Otherwise, I
> > > > think we're getting the default behavior where we generate unique,
> > > > per-process IDs, i.e. via this logic
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L662-L664
> > > >
> > > > If that's the case, would it maybe be possible to compatibly change
> the
> > > > default to use task IDs in the client ID, but only if we don't see an
> > > > existing override from the worker config? This would only change the
> > > > behavior when someone is using the default, but since the default
> would
> > > > just use what is effectively a random ID that is useless for
> monitoring
> > > > metrics, presumably this wouldn't affect any existing users. I think
> > that
> > > > would avoid having to introduce the config, give better out of the
> box
> > > > behavior, and still be a safe, compatible change to make.
> > > >
> > > >
> > > > Other than that, just two minor comments. On the config naming, not
> > sure
> > > > about a better name, but I think the config name could be a bit
> clearer
> > > if
> > > > we need to have it. Maybe something including "task" like
> > > > "task.based.client.ids" or something like that (or change the type to
> > be
> > > an
> > > > enum and make it something like task.client.ids=[default|task] and
> > leave
> > > it
> > > > open for extension in the future if needed).
> > > >
> > > > Finally, you have this:
> > > >
> > > > *"Allow overriding client.id  on a per-connector
> > > > basis"*
> > > > >
> > > > > This is a much more complex change, and would require individual
> > > > > connectors to be updated to support the change. In contrast, the
> > > proposed
> > > > > approach would immediately allow detailed consumer/producer
> > monitoring
> > > > for
> > > > > all existing connectors.
> > > > >
> > > >
> > > > I don't think this is quite accurate. I think the reason to reject is
> > > that
> > > > for your particular requirement for metrics, it simply doesn't give
> > > enough
> > > > granularity (there's only one value per entire connector), but it
> > doesn't
> > > > require any changes to connectors. The framework allocates all of
> these
> > > and
> > > > there are 

Jenkins build is back to normal : kafka-trunk-jdk8 #3289

2019-01-07 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-01-07 Thread Ryanne Dolan
I'd also prefer to avoid the new configuration property if possible. Seems
like a lighter touch without it.

Ryanne

On Sun, Jan 6, 2019 at 7:25 PM Paul Davidson 
wrote:

> Hi Konstantine,
>
> Thanks for your feedback!  I think my reply to Ewen covers most of your
> points, and I mostly agree.  If there is general agreement that changing
> the default behavior is preferable to a config change I will update my PR
> to use  that approach.
>
> Paul
>
> On Fri, Jan 4, 2019 at 5:55 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Hi Paul.
> >
> > I second Ewen and I intended to give similar feedback:
> >
> > 1) Can we avoid a config altogether?
> > 2) If we prefer to add a config anyways, can we use a set of allowed
> values
> > instead of a boolean, even if initially these values are only two? As the
> > discussion on Jira highlights, there is a potential for more naming
> > conventions in the future, even if now the extra functionality doesn't
> seem
> > essential. It's not optimal to have to deprecate a config instead of just
> > extending its set of values.
> > 3) I agree, the config name sounds too general. How about
> > "client.ids.naming.policy" or "client.ids.naming" if you want two more
> > options?
> >
> > Konstantine
> >
> > On Fri, Jan 4, 2019 at 7:38 AM Ewen Cheslack-Postava 
> > wrote:
> >
> > > Hi Paul,
> > >
> > > Thanks for the KIP. A few comments.
> > >
> > > To me, biggest question here is if we can fix this behavior without
> > adding
> > > a config. In particular, today, we don't even set the client.id for
> the
> > > producer and consumer at all, right? The *only* way it is set is if you
> > > include an override in the worker config, but in that case you need to
> be
> > > explicitly opting in with a `producer.` or `consumer.` prefix, i.e. the
> > > settings are `producer.client.id` and `consumer.client.id`.
> Otherwise, I
> > > think we're getting the default behavior where we generate unique,
> > > per-process IDs, i.e. via this logic
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L662-L664
> > >
> > > If that's the case, would it maybe be possible to compatibly change the
> > > default to use task IDs in the client ID, but only if we don't see an
> > > existing override from the worker config? This would only change the
> > > behavior when someone is using the default, but since the default would
> > > just use what is effectively a random ID that is useless for monitoring
> > > metrics, presumably this wouldn't affect any existing users. I think
> that
> > > would avoid having to introduce the config, give better out of the box
> > > behavior, and still be a safe, compatible change to make.
> > >
> > >
> > > Other than that, just two minor comments. On the config naming, not
> sure
> > > about a better name, but I think the config name could be a bit clearer
> > if
> > > we need to have it. Maybe something including "task" like
> > > "task.based.client.ids" or something like that (or change the type to
> be
> > an
> > > enum and make it something like task.client.ids=[default|task] and
> leave
> > it
> > > open for extension in the future if needed).
> > >
> > > Finally, you have this:
> > >
> > > *"Allow overriding client.id  on a per-connector
> > > basis"*
> > > >
> > > > This is a much more complex change, and would require individual
> > > > connectors to be updated to support the change. In contrast, the
> > proposed
> > > > approach would immediately allow detailed consumer/producer
> monitoring
> > > for
> > > > all existing connectors.
> > > >
> > >
> > > I don't think this is quite accurate. I think the reason to reject is
> > that
> > > for your particular requirement for metrics, it simply doesn't give
> > enough
> > > granularity (there's only one value per entire connector), but it
> doesn't
> > > require any changes to connectors. The framework allocates all of these
> > and
> > > there are already framework-defined config values that all connectors
> > share
> > > (some for only sinks or sources), so the framework can handle all of
> this
> > > without changes to connectors. Further, with connector-specific
> > overrides,
> > > you could get task-specific values if interpolation were supported in
> the
> > > config value (as we now do with managed secrets). For example, it could
> > > support something like client.id=connector-${taskId} and the task ID
> > would
> > > be substituted automatically into the string.
> > >
> > > I don't necessarily like that solution (seems complicated and not a
> great
> > > user experience), but it could work.
> > >
> > > -Ewen
> > >
> > >
> > >
> > >
> > > On Thu, Dec 20, 2018 at 5:05 PM Paul Davidson <
> pdavid...@salesforce.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I would like to start a discussion around the following KIP:
> > > > *
> > > >
> > >
> >
> 

Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2019-01-07 Thread Ryanne Dolan
Hi Ewen, thanks for the questions.

> On the ACL management, can you explain how things are supposed to work...

There are two types of topics in play here: regular topics and remote
topics. MM2 replicates regular source topics -> remote topics.

MM2 doesn't create or modify regular topics, but it fully owns and manages
remote topics, including their ACL. MM2 automatically syncs ACL changes
from source topic to remote topic (but not the other way around), s.t. if
an operator changes the ACL on a source topic, the corresponding remote
topic is updated.

Only MM2 can write to remote topics, and their ACLs are configured to
enforce this. Additionally, any READ rules for a source topic are
propagated to the remote topic. This is important for consumer
migration/failover to work reliably -- a failed-over consumer must have
access to the replicated data in a foreign cluster. Keep in mind they are
the same records after all!

Where there is an A -> B replication flow, a principal with read access to
Cluster A's topic1 will also have read access to Cluster B'a A.topic1 (a
remote topic). However, that does NOT mean that the same principal will
automatically have access to Cluster B's topic1, since topic1 is not a
remote topic. This is because the records in Cluster A's topic1 are NOT the
same as the records in Cluster B's topic1, and in fact they may have vastly
different access control requirements.

Consider the common arrangement where an organization has multiple Kafka
clusters for prod vs staging, internet/DMZ vs intranet, etc. You might want
to use MM2 to replicate a topic "foo" from prod to staging, for example. In
this case, the topic will show up in the staging cluster as "prod.foo". MM2
will make sure that any principal that can read "foo" in prod can also read
"prod.foo" in staging, since it's the same principal and the same data. You
don't have to manually create or configure "prod.foo" -- you just tell MM2
to replicate "foo" from prod to staging.

In this example, MM2 does not touch anything in the prod cluster -- it just
reads from "foo". (In fact, it doesn't write to prod at all, not even
offsets). And importantly, any changes to staging topics don't effect
anything in prod.

> is this purely for a mirroring but not DR and failover cases

DR (failover/failback, and client migration in general) is the primary
motivation for the MM2 design. ACL sync in particular exists to ensure
clients can migrate between clusters and still have access to the same data.

> In particular, the rules outlined state that only MM2 would be able to
write on the new cluster

Only MM2 can write to _remote topics_ (on any cluster). That says nothing
of normal topics.

> at some point you need to adjust ACLs for the failed-over apps to write

It depends. WRITE access is not sync'd across clusters by MM2, so you may
need some other mechanism to manage that. This limitation is by design --
it's unsafe and generally undesirable to apply write access across clusters.

Consider the prod vs staging example again. If you are replicating "foo"
from prod -> staging, you want app1 to have access to both prod's "foo" and
staging's "prod.foo", since this is the same principal and the same data,
just on separate clusters. But that doesn't mean you want prod apps to
write to staging, nor staging apps to write to prod. This is probably the
whole reason you have staging vs prod in the first place! Instead, you will
want to be deliberate when promoting an application from staging to prod,
which may involve manually granting WRITE access to prod topics.

> how things are supposed to work when you need to migrate to the new
cluster

If you have a set of clusters with MM2 syncing topics between them
("active/active"), you can migrate consumers among them using
RemoteClusterUtils, which will figure out the new offsets, partitions, and
topic names for you. The topic names will be different after migration.
There are two main scenarios 1) a consumer is subscribed to a normal topic
only, e.g. "topic1", 2) a consumer is aggregating across topics from
multiple clusters, e.g. "topic1" but also "B.topic1", "C.topic1"...

In (1), migrating a consumer from cluster A -> cluster B will result in the
subscription being changed from "topic1" to "A.topic1". These are the same
records in the same order, and the consumer can safely resume processing
from the latest checkpoint.

In (2), you get:

topic1 -> A.topic1 (cuz B's A.topic1 came from A's topic1)
B.topic1 -> topic1 (cuz A's B.topic1 came from B's topic1)
C.topic1 -> C.topic1 (cuz this topic is remote on both A and B)
...and so on.

RemoteClusterUtils does this logic for you. It's the same simple algorithm
for any sort of migration, including failover and subsequent failback.

For _producers_, migration is very simple -- the topic name does not
change. If you migrate a producer from cluster A -> cluster B, it still
produces to "topic1", only it is now cluster B's topic1. This captures the
fact that the 

Re: [VOTE] KIP-228 Negative record timestamp support

2019-01-07 Thread Guozhang Wang
I think it is true: currently timestamp field can only be non-negative or
-1, and any other values will be rejected; this proposal allows negative
values but did not change the semantics of current values (non-negative and
-1).

So for current users, if they still only use non-negative or -1 values,
they should not notice this change at all; and they should not be using
other negative values anyways since their app would fail due to rejected
records.



Guozhang


On Mon, Dec 17, 2018 at 6:41 PM Gwen Shapira  wrote:

> Guozhang,
>
> Can you speak to the following note under "impact on existing users":
> "No impact on current users, they should update their infrastructure
> in that order: Broker, Consumers, Producers."
>
> I think this isn't true, but if it is - we need to fix the proposal
> and make sure we allow upgrade at any order (as is usual for Kafka
> since version 0.10.0.0)
>
> On Sun, Dec 9, 2018 at 9:21 PM Guozhang Wang  wrote:
> >
> > Hi folks,
> >
> > Thanks for your replies! Just to clarify, the proposal itself does not
> introduce any additional fields in the message format (some new attributes
> are mentioned in the Rejected Alternatives though), so my understand is
> that we do not need to increment the magic byte version of the message
> itself.
> >
> > Also, ConsumerRecord.NO_TIMESTAMP (-1) is still used as "no timestamp",
> and as we've discussed before it is a good trade-off to say "we do not have
> way express Wednesday, December 31, 1969 11:59:59.999". I.e. we are
> extending the timestamp to have other negative values than -1, but we did
> not change the semantics if value -1 itself. So I think although we do bump
> up the request protocol version for semantics changes, it is not necessary
> for this case (please correct me if there are cases that would not work for
> it).
> >
> > I do agree with Magnus's point that for ListOffsetsRequest, we should
> consider using different values than -1 / -2 to indicate `EARLEST / LATEST
> TIMESTAMP` now since for example timestamp -2 does have a meaningful
> semantics now, and hence its protocol version would need bump as we change
> its field. And I think a single byte indicating the type as (EARLEST,
> LATEST, ACTUAL_TIMESTAMP_VALUE) should be sufficient, but I'll leave it to
> Konstandin to decide if he wants to do this KIP or do it in another
> follow-up KIP.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Dec 7, 2018 at 5:06 PM Jun Rao  wrote:
> >>
> >> Hi, Konstandin,
> >>
> >> Thanks for the KIP. I agree with Magnus on the protocol version
> changes. As
> >> for the sentinel value, currently ConsumerRecord.NO_TIMESTAMP (-1) is
> used
> >> for V0 message format. For compatibility, it seems that we still need to
> >> preserve that.
> >>
> >> Jun
> >>
> >> On Thu, Dec 6, 2018 at 2:32 AM Magnus Edenhill 
> wrote:
> >>
> >> > Sorry for getting in the game this late, and on the wrong thread!
> >> >
> >> > I think negative timestamps makes sense and is a good addition,
> >> > but I have a couple of concerns with the proposal:
> >> >
> >> >  1. I believe any change to the protocol format or semantics require a
> >> > protocol bump, in this case for ProduceRequest, FetchRequest,
> >> > ListOffsetsRequest.
> >> >  2. ListOffsetsRequest should be changed to allow logical (END,
> BEGINNING)
> >> > and absolute lookups without special treatment of two absolute values
> as
> >> > logical (-1, -2), this seems like a hack and will require application
> logic
> >> > to avoid these timestamps, that's leaky abstraction.
> >> >  Perhaps add a new field `int8 LookupType = { BEGINNING=-2,
> END=-1,
> >> > TIMESTAMP=0 }`: the broker will either look up using the absolute
> >> > Timestamp, or logical offset value, depending on the value of
> LookupType.
> >> >  3. With the added Attribute for extended timestamp, do we really
> need to
> >> > have a sentinel value for an unset timestamp (-1 or Long.MIN_VALUE)?
> >> >  To make the logic simpler I suggest the attribute is renamed to
> just
> >> > Timestamp, and if the Timestamp attribute is set, the Timestamp field
> is
> >> > always a proper timestamp. If the bit is not set, no timestamp was
> >> > provided.
> >> >
> >> >  /Magnus
> >> >
> >> >
> >> >
> >> > Den tors 6 dec. 2018 kl 08:06 skrev Gwen Shapira :
> >> >
> >> > > I may be missing something, but why are we using an attribute for
> >> > > this? IIRC, we normally bump protocol version to indicate semantic
> >> > > changes. If I understand correctly, not using an attribute will
> allow
> >> > > us to not change the message format (just the protocol), which makes
> >> > > the upgrade significantly easier (since we don't up/down convert).
> >> > >
> >> > > Another thing I don't understand: The compatibility map indicates
> that
> >> > > NO_TIMESTAMP is now Long.MIN_VALUE, but a bit above that you say
> that
> >> > > -1 semantics does not change.
> >> > >
> >> > > Last: At around version 1.0 we decide to completely avoid changes
> that
> >> > > require a 

[jira] [Resolved] (KAFKA-7768) Java doc link 404

2019-01-07 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7768.
--
   Resolution: Fixed
Fix Version/s: 2.1.1
   2.2.0

> Java doc link 404 
> --
>
> Key: KAFKA-7768
> URL: https://issues.apache.org/jira/browse/KAFKA-7768
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 2.1.0
>Reporter: Slim Ouertani
>Priority: Critical
> Fix For: 2.2.0, 2.1.1
>
>
> The official documentation link example 
> [https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html]
>  (with no release reference) not referring to valid Java doc like 
> [https://kafka.apache.org/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html].
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7791) Not log retriable exceptions as errors

2019-01-07 Thread Yaroslav Klymko (JIRA)
Yaroslav Klymko created KAFKA-7791:
--

 Summary: Not log retriable exceptions as errors
 Key: KAFKA-7791
 URL: https://issues.apache.org/jira/browse/KAFKA-7791
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.1.0
Reporter: Yaroslav Klymko


Background: I've spotted tons of kafka related errors in logs, after 
investigation I found out that those are harmless as being retried.
Hence I propose to not log retriable exceptions as errors.

Examples of what I've see in logs:
 * Offset commit failed on partition .. at offset ..: The request timed out.
 * Offset commit failed on partition .. at offset ..: The coordinator is 
loading and hence can't process requests.
 * Offset commit failed on partition .. at offset ..: This is not the correct 
coordinator.
 * Offset commit failed on partition .. at offset ..: This server does not host 
this topic-partition.

 

Here is attempt to fix this: https://github.com/apache/kafka/pull/5904



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

2019-01-07 Thread ChienHsing Wu
Hi guys,

I am not sure what to do next in this KIP process. Could anyone please 
help/advise me on what to do next? 

Thanks, CH

-Original Message-
From: ChienHsing Wu  
Sent: Wednesday, January 02, 2019 12:55 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi Colin,

Setting max.partition.fetch.bytes was discussed in the ticket. It's not as 
desirable if the message size is highly variable. Also this decrease the 
efficiency of network communication. 

In the case you mentioned below where a consumer can get messages from A, B, C 
and D but the consumer currently only has messages from A, B and C, the 
proposed change will NOT wait until some messages from D arrives to start 
returning messages; it will just serve those from A, B and C. It will include 
those from D when they are available. That IS the current behavior. The 
proposed change does not impose a strict round robin pattern.

The original KIP 41 discussed "Ensuring Fair Consumption", that means it 
originally intended to take that into account in the Consumer code, the 
proposed change takes the current algorithm closer to that goal, IMHO. I could 
implement that logic at the caller side but, that would mean each library user 
need to know the inner working of the consumer code and to implement the logic 
on their own. Though as a first timer here, I do appreciate the complexity and 
functionalities in the client library and feel that we'd be better off as a 
community to implement the logic in the library so the complexity is hidden 
from library users.

Thanks, CH

-Original Message-
From: Colin McCabe 
Sent: Saturday, December 22, 2018 3:53 AM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP 
here.  You can just set max.partition.fetch.bytes to a very small value.  That 
will cause Kafka to fetch only one message from each partition.  This will give 
you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could 
do your own buffering to get round robin behavior.  Keep a buffer of messages 
from partition A, B, C, and D and hold back the messages from A, B, and C until 
one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from 
> Mayuresh was the question about needing KIP for this change or not as 
> the KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in 
> this KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -Original Message-
> From: ChienHsing Wu 
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -Original Message-
> From: Mayuresh Gharat 
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for 
> paused topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D7548=DwIFaQ=ZgVRmm3mf2P1-XDAyDsu4A=Az03wMrb
> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5
> uOWVIftTJ-U=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU=
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu  wrote:
> 
> > Hi Mayuresh,
> >
> > Thanks for the input!
> >
> > Pausing and Resuming are cumbersome and has some undesirable 
> > performance impact since pausing will in effect clean up the 
> > completed fetch and resuming will call the broker to retrieve again.
> >
> > The way I changed the code was just to parse the completed fetch 
> > earlier and ensure the order to retrieve are the same as the completed 
> > fetch queue.
> > I did make code changes to take into account the following in Fetcher class.
> >
> > 1) exception handling
> > 2) ensure the parsed partitions are not included in 
> > fetchablePartitions
> > 3) clear buffer when not in the newly assigned partitions in 
> > 

[jira] [Resolved] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2019-01-07 Thread Dmitry Buykin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Buykin resolved KAFKA-7695.
--
Resolution: Not A Bug

> Cannot override StreamsPartitionAssignor in configuration 
> --
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Dmitry Buykin
>Priority: Major
>  Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-07 Thread Adam Bellemare
Hi All

I would like to call a new vote on KIP-213. The design has changed
substantially. Perhaps more importantly, the KIP and associated
documentation has been greatly simplified. I know this KIP has been on the
mailing list for a long time, but the help from John Roesler and Guozhang
Wang have helped put it into a much better state. I would appreciate any
feedback or votes.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable



Thank you

Adam Bellemare


[jira] [Created] (KAFKA-7790) Trogdor - Does not time out tasks in time

2019-01-07 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7790:
--

 Summary: Trogdor - Does not time out tasks in time
 Key: KAFKA-7790
 URL: https://issues.apache.org/jira/browse/KAFKA-7790
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


All Trogdor task specifications have a defined `startMs` and `durationMs`. 
Under conditions of task failure and restarts, it is intuitive to assume that a 
task would not be re-ran after a certain time period.

Let's best illustrate the issue with an example:
{code:java}
startMs = 12PM; durationMs = 1hour;
# 12:02 - Coordinator schedules a task to run on agent-0
# 12:45 - agent-0 process dies. Coordinator's heartbeats to agent-0 fail.
# 12:47 - agent-0 comes back up. Coordinator's heartbeats pass and it 
re-schedules tasks that are not running in agent-0
# 13:20 - agent-0 process dies.
# 13:22 - agent-0 comes back up. Coordinator re-schedules task{code}
This can result in an endless loop of task rescheduling. If there are more 
tasks scheduled on agent-0 (e.g a task scheduled to start each on hour), we can 
end up in a scenario where we overwhelm the agent with tasks that we would 
rather have dropped.
h2. Changes


We propose that the Trogdor Coordinator does not re-schedule a task if the 
current time of re-scheduling is greater than the start time of the task and 
its duration combined. More specifically:
{code:java}
if (currentTimeMs < startTimeMs + durationTimeMs)
  scheduleTask()
else
  failTask(){code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7789) SSL-related unit tests hang when run on Fedora 29

2019-01-07 Thread Tom Bentley (JIRA)
Tom Bentley created KAFKA-7789:
--

 Summary: SSL-related unit tests hang when run on Fedora 29
 Key: KAFKA-7789
 URL: https://issues.apache.org/jira/browse/KAFKA-7789
 Project: Kafka
  Issue Type: Bug
Reporter: Tom Bentley
Assignee: Tom Bentley


Various SSL-related unit tests (such as {{SslSelectorTest}}) hang when executed 
on Fedora 29. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-395: Encypt-then-MAC Delegation token metadata

2019-01-07 Thread Attila Sasvári
Manikumar, Satish. Thanks for the review! As I understand, you are not in
favor of this KIP, and I do agree that having a pluggable mechanism for
sensitive data / metadata is preferable/more future-proof.

On Wed, Dec 12, 2018 at 8:12 AM Satish Duggana 
wrote:

> Agree with Manikumar on having pluggable mechanism for entities
> required/created for delegation token mechanism. I will cover that as
> part of KAFKA-7694.
>
> Thanks,
> Satish.
> On Tue, Dec 11, 2018 at 12:35 PM Manikumar 
> wrote:
> >
> > Hi,
> >
> > Thanks for the KIP.
> >
> > Currently, master/secret key is stored as plain text in server.properties
> > config file.
> > Using master secret key as shared secret is again a security risk. We
> have
> > raised KAFKA-7694
> > to implement a ZooKeeper based master/secret key management to automate
> > secret key rotation.
> >
> > As you mentioned in the alternatives sections, it is good to have
> pluggable
> > mechanism for
> > token storage and master key generation. We can implement pluggable
> > interfaces for token storage
> > and master key generation as part of KAFKA-7694. This will provide us out
> > of the box implementation
> > using ZooKeeper and pluggable interfaces for custom implementations.
> >
> > What do you think?
> >
> > Thanks,
> > Manikumar
> >
> > On Sat, Dec 1, 2018 at 9:37 PM Attila Sasvári 
> wrote:
> >
> > > Hi All,
> > >
> > > I have a proposal to allow Kafka brokers to encrypt sensitive metadata
> > > information about delegation tokens.
> > >
> > > As of now, delegation token metadata is stored in an unencrypted
> format in
> > > Zookeeper. Having the possibility to encrypt-then-MAC token information
> > > would be beneficial in Kafka installations where Zookeeper is not on a
> > > private network.
> > >
> > > Please take a look at
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-395%3A+Encypt-then-MAC+Delegation+token+metadata
> > > and let me know what you think.
> > >
> > > - Attila
> > >
>


Build failed in Jenkins: kafka-trunk-jdk8 #3288

2019-01-07 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Remove sleep calls and ignore annotation from streams upgrade

--
[...truncated 4.50 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2019-01-07 Thread Stanislav Kozlovski
Hey there,

Per Gwen's comments, I slightly reworked the motivation section. Let me
know if it's any better now

I completely agree that it would be best if we were to add a recommended
number to a typical consumer group size. There is a problem that timing the
CPU usage and rebalance times of consumer groups is tricky. We can update
the KIP with memory guidelines (e.g 1 consumer in a group uses X memory,
therefore 100 use Y).
I fear that the most useful recommendations though would be knowing the CPU
impact of large consumer groups and the rebalance times. That is,
unfortunately, tricky to test and measure.

@Boyang, you had mentioned some numbers used in Pinterest. If available to
you, would you be comfortable sharing the number of consumers you are using
in a group and maybe the potential time it takes to rebalance it?

I'd appreciate any anecdotes regarding consumer group sizes from the
community

Best,
Stanislav

On Thu, Jan 3, 2019 at 1:59 AM Boyang Chen  wrote:

> Thanks Gwen for the suggestion! +1 on the guidance of defining
> group.max.size. I guess a sample formula would be:
> 2 * (# of brokers * average metadata cache size * 80%) / (# of consumer
> groups * size of a single member metadata)
>
> if we assumed non-skewed partition assignment and pretty fair consumer
> group consumption. The "2" is the 95 percentile of normal distribution and
> 80% is just to buffer some memory capacity which are both open to
> discussion. This config should be useful for Kafka platform team to make
> sure one extreme large consumer group won't bring down the whole cluster.
>
> What do you think?
>
> Best,
> Boyang
>
> 
> From: Gwen Shapira 
> Sent: Thursday, January 3, 2019 2:59 AM
> To: dev
> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> metadata growth
>
> Sorry for joining the fun late, but I think the problem we are solving
> evolved a bit in the thread, and I'd like to have better understanding
> of the problem before voting :)
>
> Both KIP and discussion assert that large groups are a problem, but
> they are kinda inconsistent regarding why they are a problem and whose
> problem they are...
> 1. The KIP itself states that the main issue with large groups are
> long rebalance times. Per my understanding, this is mostly a problem
> for the application that consumes data, but not really a problem for
> the brokers themselves, so broker admins probably don't and shouldn't
> care about it. Also, my understanding is that this is a problem for
> consumer groups, but not necessarily a problem for other group types.
> 2. The discussion highlights the issue of "run away" groups that
> essentially create tons of members needlessly and use up lots of
> broker memory. This is something the broker admins will care about a
> lot. And is also a problem for every group that uses coordinators and
> not just consumers. And since the memory in question is the metadata
> cache, it probably has the largest impact on Kafka Streams
> applications, since they have lots of metadata.
>
> The solution proposed makes the most sense in the context of #2, so
> perhaps we should update the motivation section of the KIP to reflect
> that.
>
> The reason I'm probing here is that in my opinion we have to give our
> users some guidelines on what a reasonable limit is (otherwise, how
> will they know?). Calculating the impact of group-size on rebalance
> time in order to make good recommendations will take a significant
> effort. On the other hand, informing users regarding the memory
> footprint of a consumer in a group and using that to make a reasonable
> suggestion isn't hard.
>
> Gwen
>
>
> On Sun, Dec 30, 2018 at 12:51 PM Stanislav Kozlovski
>  wrote:
> >
> > Thanks Boyang,
> >
> > If there aren't any more thoughts on the KIP I'll start a vote thread in
> > the new year
> >
> > On Sat, Dec 29, 2018 at 12:58 AM Boyang Chen 
> wrote:
> >
> > > Yep Stanislav, that's what I'm proposing, and your explanation makes
> sense.
> > >
> > > Boyang
> > >
> > > 
> > > From: Stanislav Kozlovski 
> > > Sent: Friday, December 28, 2018 7:59 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> > > metadata growth
> > >
> > > Hey there everybody, let's work on wrapping this discussion up.
> > >
> > > @Boyang, could you clarify what you mean by
> > > > One more question is whether you feel we should enforce group size
> cap
> > > statically or on runtime?
> > > Is that related to the option of enabling this config via the dynamic
> > > broker config feature?
> > >
> > > Regarding that - I feel it's useful to have and I also think it might
> not
> > > introduce additional complexity. Ås long as we handle the config being
> > > changed midway through a rebalance (via using the old value) we should
> be
> > > good to go.
> > >
> > > On Wed, Dec 12, 2018 at 4:12 PM Stanislav Kozlovski <
> > > stanis...@confluent.io>