Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-06-09 Thread Boyang Chen
Hey all,

Thanks for the great discussions so far. I'm posting some KIP updates from
our working group discussion:

1. We will be changing the core RPCs from single-raft API to multi-raft.
This means all protocols will be "batch" in the first version, but the KIP
itself only illustrates the design for a single metadata topic partition.
The reason is to "keep the door open" for future extensions of this piece
of module such as a sharded controller or general quorum based topic
replication, beyond the current Kafka replication protocol.

2. We will piggy-back on the current Kafka Fetch API instead of inventing a
new FetchQuorumRecords RPC. The motivation is about the same as #1 as well
as making the integration work easier, instead of letting two similar RPCs
diverge.

3. In the EndQuorumEpoch protocol, instead of only sending the request to
the most caught-up voter, we shall broadcast the information to all voters,
with a sorted voter list in descending order of their corresponding
replicated offset. In this way, the top voter will become a candidate
immediately, while the other voters shall wait for an exponential back-off
to trigger elections, which helps ensure the top voter gets elected, and
the election eventually happens when the top voter is not responsive.

Please see the updated KIP and post any questions or concerns on the
mailing thread.

Boyang

On Fri, May 8, 2020 at 5:26 PM Jun Rao  wrote:

> Hi, Guozhang and Jason,
>
> Thanks for the reply. A couple of more replies.
>
> 102. Still not sure about this. How is the tombstone issue addressed in the
> non-voter and the observer.  They can die at any point and restart at an
> arbitrary later time, and the advancing of the firstDirty offset and the
> removal of the tombstone can happen independently.
>
> 106. I agree that it would be less confusing if we used "epoch" instead of
> "leader epoch" consistently.
>
> Jun
>
> On Thu, May 7, 2020 at 4:04 PM Guozhang Wang  wrote:
>
> > Thanks Jun. Further replies are in-lined.
> >
> > On Mon, May 4, 2020 at 11:58 AM Jun Rao  wrote:
> >
> > > Hi, Guozhang,
> > >
> > > Thanks for the reply. A few more replies inlined below.
> > >
> > > On Sun, May 3, 2020 at 6:33 PM Guozhang Wang 
> wrote:
> > >
> > > > Hello Jun,
> > > >
> > > > Thanks for your comments! I'm replying inline below:
> > > >
> > > > On Fri, May 1, 2020 at 12:36 PM Jun Rao  wrote:
> > > >
> > > > > 101. Bootstrapping related issues.
> > > > > 101.1 Currently, we support auto broker id generation. Is this
> > > supported
> > > > > for bootstrap brokers?
> > > > >
> > > >
> > > > The vote ids would just be the broker ids. "bootstrap.servers" would
> be
> > > > similar to what client configs have today, where "quorum.voters"
> would
> > be
> > > > pre-defined config values.
> > > >
> > > >
> > > My question was on the auto generated broker id. Currently, the broker
> > can
> > > choose to have its broker Id auto generated. The generation is done
> > through
> > > ZK to guarantee uniqueness. Without ZK, it's not clear how the broker
> id
> > is
> > > auto generated. "quorum.voters" also can't be set statically if broker
> > ids
> > > are auto generated.
> > >
> > > Jason has explained some ideas that we've discussed so far, the reason
> we
> > intentional did not include them so far is that we feel it is out-side
> the
> > scope of KIP-595. Under the umbrella of KIP-500 we should definitely
> > address them though.
> >
> > On the high-level, our belief is that "joining a quorum" and "joining (or
> > more specifically, registering brokers in) the cluster" would be
> > de-coupled a bit, where the former should be completed before we do the
> > latter. More specifically, assuming the quorum is already up and running,
> > after the newly started broker found the leader of the quorum it can
> send a
> > specific RegisterBroker request including its listener / protocol / etc,
> > and upon handling it the leader can send back the uniquely generated
> broker
> > id to the new broker, while also executing the "startNewBroker" callback
> as
> > the controller.
> >
> >
> > >
> > > > > 102. Log compaction. One weak spot of log compaction is for the
> > > consumer
> > > > to
> > > > > deal with deletes. When a key is deleted, it's retained as a
> > tombstone
> > > > > first and then physically removed. If a client misses the tombstone
> > > > > (because it's physically removed), it may not be able to update its
> > > > > metadata properly. The way we solve this in Kafka is based on a
> > > > > configuration (log.cleaner.delete.retention.ms) and we expect a
> > > consumer
> > > > > having seen an old key to finish reading the deletion tombstone
> > within
> > > > that
> > > > > time. There is no strong guarantee for that since a broker could be
> > > down
> > > > > for a long time. It would be better if we can have a more reliable
> > way
> > > of
> > > > > dealing with deletes.
> > > > >
> > > >
> > > > We propose to capture this in the "FirstDirtyOffset" field 

Build failed in Jenkins: kafka-trunk-jdk11 #1556

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9716; Clarify meaning of compression rate metrics (#8664)

[github] KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie


--
[...truncated 4.90 MB...]

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionStartedToNewReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testNewReplicaToOnlineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testNewReplicaToOnlineReplicaTransition PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerDetectsBouncedBrokers STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerDetectsBouncedBrokers PASSED

kafka.controller.ControllerIntegrationTest > testControlledShutdown STARTED

kafka.controller.ControllerIntegrationTest > testControlledShutdown PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentWithOfflineReplicaHaltingProgress PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerEpochPersistsWhenAllBrokersDown PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicCreationWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline STARTED

kafka.controller.ControllerIntegrationTest > 
testPartitionReassignmentResumesAfterReplicaComesOnline PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled PASSED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testTopicPartitionExpansionWithOfflineReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica STARTED

kafka.controller.ControllerIntegrationTest > 
testPreferredReplicaLeaderElectionWithOfflinePreferredReplica PASSED

kafka.controller.ControllerIntegrationTest > 
testMetadataPropagationOnControlPlane STARTED

kafka.controller.ControllerIntegrationTest > 
testMetadataPropagationOnControlPlane PASSED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection STARTED

kafka.controller.ControllerIntegrationTest > 
testAutoPreferredReplicaLeaderElection PASSED

kafka.controller.ControllerIntegrationTest > testTopicCreation STARTED

kafka.controller.ControllerIntegrationTest > testTopicCreation PASSED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicDeletion 
STARTED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicDeletion 
PASSED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment STARTED

kafka.controller.ControllerIntegrationTest > testPartitionReassignment PASSED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion STARTED

kafka.controller.ControllerIntegrationTest > testTopicPartitionExpansion PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveIncrementsControllerEpoch PASSED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled STARTED

kafka.controller.ControllerIntegrationTest > 
testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveOnPartitionReassignment STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerMoveOnPartitionReassignment PASSED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicCreation 
STARTED

kafka.controller.ControllerIntegrationTest > testControllerMoveOnTopicCreation 
PASSED

kafka.controller.ControllerIntegrationTest > 
testControllerRejectControlledShutdownRequestWithStaleBrokerEpoch STARTED

kafka.controller.ControllerIntegrationTest > 
testControllerRejectControlledShutdownRequestWithStaleBrokerEpoch PASSED

kafka.controller.ControllerIntegrationTest > 
testBackToBackPreferredReplicaLeaderElections STARTED

kafka.controller.ControllerIntegrationTest > 
testBackToBackPreferredReplicaLeaderElections PASSED

kafka.controller.ControllerInt

Jenkins build is back to normal : kafka-trunk-jdk8 #4629

2020-06-09 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-09 Thread Sagar
Hi John,

You rightly pointed out, the devil is in the detail :). I will start with
the implementation to get a sense.

Here are my thoughts on the core challenge that you pointed out. The key
value store abstractions that have been exposed via the state store DSL
APIs, make it possible for the end user to define generic key types.
However, the Serdes are the one which convert those generic keys/values
into the format in which the state store stores them- which for all
practical purposes are byte-arrays. I think with the prefix type serde, if
it converts the prefix to the same internal storage type (byte array) as
that of the Keys, then we should be able to do a prefix scan.

Regarding other databases, I have worked a bit with Redis which also
provides a scan operator using the glob style pattern match(it's more
evolved than prefix scan but can be converted easily):

https://redis.io/commands/scan#the-match-option

Typically Redis works with Binary Safe strings so the prefix key type and
the actual keys are of the same type.

Thanks!
Sagar.



On Wed, Jun 10, 2020 at 1:41 AM John Roesler  wrote:

> Hi Sagar,
>
> Thanks for the reply. I agree that your UUID example illustrates the
> problem I was pointing out.
>
> Yes, I think that experimenting with the API in the PR is probably the
> best way to make progress (as opposed to just thinking in terms of
> design on the wiki) because with this kind of thing, the devil is often
> in the details.
>
> To clarify what I meant by that last statement, I see the core challenge
> here as deriving from the fact that we have a key/value store with
> generically typed keys, with a separate component (the serde) that
> turns those typed keys into bytes for storage. In contrast, RocksDB
> can easily offer a "prefix scan" operation because they key type is
> always just a byte array, so "prefix" is a very natural concept to offer
> in the API. Other key/value stores force the keys to always be strings,
> which also makes it easy to define a prefix scan.
>
> My question is whether there are other databases that offer both:
> 1. generically typed keys (as opposed to just bytes, just strings, etc)
> 2. prefix scans
> And, if so, what the API looks like.
>
> Thanks,
> -John
>
> On Tue, Jun 9, 2020, at 11:51, Sagar wrote:
> > Hi John,
> >
> > Thanks for the response. For starters, for our use case, we tweaked our
> > keys etc to avoid prefix scans. So, we are good there.
> >
> > Regarding the KIP, I see what you mean when you say that the same key
> type
> > for prefix won't work. For example, continuing with the UUID example that
> > you gave, let's say one of the UUIDs
> > is 123e4567-e89b-12d3-a456-426614174000, and with a prefix scan we want
> to
> > fetch all keys starting with 123. There's already a UUIDSerde so if the
> > keys have been stored with that, then using UUIDSerde for prefixes won't
> > help- I am not sure if the UUIDSerializer would even work for 123.
> >
> > So, that indicates that we will need to provide a new prefix key type
> > serializer. Having said that, how it will be stitched together and
> finally
> > exposed using the APIs is something that is up for questioning. This is
> > something you have also brought up in the earlier emails. If it
> > makes sense, I can modify my PR to go along these lines. Please let me
> know
> > what you think.
> >
> > Lastly, I didn't understand this line of yours: *It might help if there
> are
> > other typed key/value stores to compare APIs with.*
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Jun 4, 2020 at 6:03 AM John Roesler  wrote:
> >
> > > Hi Sagar,
> > >
> > > Thanks for the question, and sorry for muddying the water.
> > >
> > > I meant the Bytes/byte[] thing as advice for how you all can solve your
> > > problem in the mean time, while we work through this KIP. I don’t think
> > > it’s relevant for the KIP itself.
> > >
> > > I think the big issue here is what the type of the prefix should be in
> the
> > > method signature. Using the same type as the key makes sense some
> times,
> > > but not other times. I’m not sure what the best way around this might
> be.
> > > It might help if there are other typed key/value stores to compare APIs
> > > with.
> > >
> > > Thanks,
> > > John
> > >
> > > On Mon, Jun 1, 2020, at 09:58, Sagar wrote:
> > > > Hi John,
> > > >
> > > > Just to add to my previous email(and sorry for the spam), if we
> consider
> > > > using Bytes/byte[] and manually invoke the serdes, if you could
> provide
> > > > examples where the same Serde for keys won't work for the prefix
> types.
> > > As
> > > > far as my understanding goes, the prefix seek would depend upon
> ordering
> > > of
> > > > the keys like lexicographic. As long as the binary format is
> consistent
> > > for
> > > > both the keys and the prefixes would it not ensure the ability to
> search
> > > in
> > > > that same ordering space? This is from my limited understanding so
> any
> > > > concrete examples would be helpful...
> > 

Re: [VOTE] KIP-572: Improve timeouts and retires in Kafka Streams

2020-06-09 Thread Guozhang Wang
Sounds fair to me. I'm +1 on the KIP.

On Thu, Jun 4, 2020 at 5:26 PM Matthias J. Sax  wrote:

> Guozhang,
>
> what you propose makes sense, but this seems to get into implementation
> detail territory already?
>
> Thus, if there are nor further change requests to the KIP wiki page
> itself, I would like to proceed with the VOTE.
>
>
> -Matthias
>
> On 5/20/20 12:30 PM, Guozhang Wang wrote:
> > Thanks Matthias,
> >
> > I agree with you on all the bullet points above. Regarding the
> admin-client
> > outer-loop retries inside partition assignor, I think we should treat
> error
> > codes differently from those two blocking calls:
> >
> > Describe-topic:
> > * unknown-topic (3): add this topic to the to-be-created topic list.
> > * leader-not-available (5): do not try to create, retry in the outer
> loop.
> > * request-timeout: break the current loop and retry in the outer loop.
> > * others: fatal error.
> >
> > Create-topic:
> > * topic-already-exists: retry in the outer loop to validate the
> > num.partitions match expectation.
> > * request-timeout: break the current loop and retry in the outer loop.
> > * others: fatal error.
> >
> > And in the outer-loop, I think we can have a global timer for the whole
> > "assign()" function, not only for the internal-topic-manager, and the
> timer
> > can be hard-coded with, e.g. half of the rebalance.timeout to get rid of
> > the `retries`; if we cannot complete the assignment before the timeout
> runs
> > out, we can return just the partial assignment (e.g. if there are two
> > tasks, but we can only get the topic metadata for one of them, then just
> do
> > the assignment for that one only) while encoding in the error-code field
> to
> > request for another rebalance.
> >
> > Guozhang
> >
> >
> >
> > On Mon, May 18, 2020 at 7:26 PM Matthias J. Sax 
> wrote:
> >
> >> No worries Guozhang, any feedback is always very welcome! My reply is
> >> going to be a little longer... Sorry.
> >>
> >>
> >>
> >>> 1) There are some inconsistent statements in the proposal regarding
> what
> >> to
> >>> deprecated:
> >>
> >> The proposal of the KIP is to deprecate `retries` for producer, admin,
> >> and Streams. Maybe the confusion is about the dependency of those
> >> settings within Streams and that we handle the deprecation somewhat
> >> different for plain clients vs Streams:
> >>
> >> For plain producer/admin the default `retries` is set to MAX_VALUE. The
> >> config will be deprecated but still be respected.
> >>
> >> For Streams, the default `retries` is set to zero, however, this default
> >> retry does _not_ affect the embedded producer/admin clients -- both
> >> clients stay on their own default of MAX_VALUES.
> >>
> >> Currently, this introduces the issue, that if a user wants to increase
> >> Streams retries, she might by accident reduce the embedded client
> >> retries, too. To avoid this issue, she would need to set
> >>
> >> retries=100
> >> producer.retires=MAX_VALUE
> >> admin.retries=MAX_VALUE
> >>
> >> This KIP will fix this issue only in the long term though, ie, when
> >> `retries` is finally removed. Short term, using `retries` in
> >> StreamsConfig would still affect the embedded clients, but Streams, as
> >> well as both client would log a WARN message. This preserves backward
> >> compatibility.
> >>
> >> Withing Streams `retries` is ignored and the new `task.timeout.ms` is
> >> used instead. This increase the default resilience of Kafka Streams
> >> itself. We could also achieve this by still respecting `retries` and to
> >> change it's default value. However, because we deprecate `retries` it
> >> seems better to just ignore it and switch to the new config directly.
> >>
> >> I updated the KIPs with some more details.
> >>
> >>
> >>
> >>> 2) We should also document the related behavior change in
> >> PartitionAssignor
> >>> that uses AdminClient.
> >>
> >> This is actually a good point. Originally, I looked into this only
> >> briefly, but it raised an interesting question on how to handle it.
> >>
> >> Note that `TimeoutExceptions` are currently not handled in this retry
> >> loop. Also note that the default retries value for other errors would be
> >> MAX_VALUE be default (inherited from `AdminClient#retries` as mentioned
> >> already by Guozhang).
> >>
> >> Applying the new `task.timeout.ms` config does not seem to be
> >> appropriate because the AdminClient is used during a rebalance in the
> >> leader. We could introduce a new config just for this case, but it seems
> >> to be a little bit too much. Furthermore, the group-coordinator applies
> >> a timeout on the leader anyway: if the assignment is not computed within
> >> the timeout, the leader is removed from the group and another rebalance
> >> is triggered.
> >>
> >> Overall, we make multiple admin client calls and thus we should keep
> >> some retry logic and not just rely on the admin client internal retries,
> >> as those would fall short to retry different calls interleaved. We could
> >> jus

[jira] [Created] (KAFKA-10135) Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager

2020-06-09 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10135:
---

 Summary: Extract Task#executeAndMaybeSwallow to be a general 
utility function into TaskManager
 Key: KAFKA-10135
 URL: https://issues.apache.org/jira/browse/KAFKA-10135
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


We have a couple of cases where we need to swallow the exception during 
operations in both Task class and TaskManager class. This utility method should 
be generalized at least onto TaskManager level. See discussion comment 
[here|https://github.com/apache/kafka/pull/8833#discussion_r437697665].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : kafka-trunk-jdk14 #204

2020-06-09 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-2.5-jdk8 #148

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[konstantine] KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating 
zombie


--
[...truncated 3.18 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] 

Build failed in Jenkins: kafka-trunk-jdk8 #4628

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie


--
[...truncated 6.26 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProduc

Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread Jun Rao
Hi, Colin,

Good point. Maybe sth like THROTTLING_QUOTA_VIOLATED will make this clear.

Hi, David,

We added a new quota name in the KIP. You chose not to bump up the version
of DESCRIBE_CLIENT_QUOTAS and ALTER_CLIENT_QUOTAS, which seems ok since the
quota name is represented as a string. However, the new quota name can be
used in client tools for setting and listing the quota (
https://kafka.apache.org/documentation/#quotas). Could you document how the
new name will be used in those tools?

Thanks,

Jun

On Tue, Jun 9, 2020 at 3:37 PM Colin McCabe  wrote:

> On Tue, Jun 9, 2020, at 05:06, David Jacot wrote:
> > Hi Colin,
> >
> > Thank you for your feedback.
> >
> > Jun has summarized the situation pretty well. Thanks Jun! I would like to
> > complement it with the following points:
> >
> > 1. Indeed, when the quota is exceeded, the broker will reject the topic
> > creations, partition creations and topics deletions that are exceeding
> > with the new QUOTA_VIOLATED error. The ThrottleTimeMs field will
> > be populated accordingly to let the client know how long it must wait.
> >
> > 2. I do agree that we actually want a mechanism to apply back pressure
> > to the clients. The KIP basically proposes a mechanism to control and to
> > limit the rate of operations before entering the controller. I think that
> > it is similar to your thinking but is enforced based on a defined
> > instead of relying on the number of pending items in the controller.
> >
> > 3. You mentioned an alternative idea in your comments that, if I
> understood
> > correctly, would bound the queue to limit the overload and reject work if
> > the queue is full. I have been thinking about this as well but I don't
> think
> > that it  works well in our case.
> > - The first reason is the one mentioned by Jun. We actually want to be
> able
> > to limit specific clients (the misbehaving ones) in a multi-tenant
> > environment.
> > - The second reason is that, at least in our current implementation, the
> > length of the queue is not really a good characteristic to estimate the
> load.
> > Coming back to your example of the CreateTopicsRequest. They create path
> >  in ZK for each newly created topics which trigger a ChangeTopic event
> in
> > the controller. That single event could be for a single topic in some
> cases or
> > for a thousand topics in others.
> > These two reasons aside, bounding the queue also introduces a knob which
> > requires some tuning and thus suffers from all the points you mentioned
> as
> > well, isn't it? The value may be true for one version but not for
> another.
> >
> > 4. Regarding the integration with KIP-500. The implementation of this KIP
> > will span across the ApiLayer and the AdminManager. To be honest, we
> > can influence the implementation to work best with what you have in mind
> > for the future controller. If you could shed some more light on the
> future
> > internal architecture, I can take this into account during the
> > implementation.
> >
>
> Hi David,
>
> Good question.  The approach we are thinking of is that in the future,
> topic creation will be a controller RPC.  In other words, rather than
> changing ZK and waiting for the controller code to notice, we'll go through
> the controller code (which may change ZK, or may do something else in the
> ZK-free environment).
>
> I don't think there is a good way to write this in the short term that
> avoids having to rewrite in the long term.  That's probably OK though, as
> long as we keep in mind that we'll need to.
>
> >
> > 5. Regarding KIP-590. For the create topics request, create partitions
> > request, and delete topics request, we are lucky enough to have directed
> > all of them to the controller already so we are fine with doing the
> admission
> > in the broker which hosts the controller. If we were to throttle more
> operations
> > in the future,
> > I believe that we can continue to do it centrally while leveraging the
> > principal that is forwarded to account for the right tenant. The reason
> > why I would like to keep it central is that we are talking about sparse
> (or bursty)
> > workloads here so distributing the quota among all the brokers makes
> little sense.
> >
>
> Right.  The main requirement here is that the quota must operate based on
> principal names rather than KafkaPrincipal objects.  We had a long
> discussion about KIP-590 and eventually concluded that we didn't want to
> make KafkaPrincipal serializable (at least not yet) so what would be
> forwarded is just a string, not the principal object.
>
> >
> > 6. Regarding the naming of the new error code. BUSY sounds too generic to
> > me so I would rather prefer a specific error code. The main reason being
> > that we may be able to reuse it in the future for other quotas. That
> being said,
> > we can find another name. QUOTA_EXHAUSTED? I don't feel too strongly
> about
> > the naming. I wonder what the others think about this.
> >
>
> Think about if you're someone

Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-06-09 Thread Jun Rao
Hi, Boyang,

Thanks for the KIP. Just a few comments on the metrics.

1. It would be useful to document the full JMX metric names (package, type,
etc) of the new metrics. Also, for rates on the server side, we
typically use Yammer Meter.

2. For num-messages-redirected-rate, would num-requests-redirected-rate be
better?

3. num-client-forwarding-to-controller-rate: Is that based on clientId,
client IP, client request version or sth else? How do you plan to implement
that since it seems to require tracking the current unique client set
somehow. An alternative approach is to maintain a
num-requests-redirected-rate metric with a client tag.

Jun



On Mon, Jun 8, 2020 at 9:36 AM Boyang Chen 
wrote:

> Hey there,
>
> If no more question is raised, I will go ahead and start the vote shortly.
>
> On Thu, Jun 4, 2020 at 12:39 PM Boyang Chen 
> wrote:
>
> > Hey there,
> >
> > bumping this thread for any further KIP-590 discussion, since it's been
> > quiet for a while.
> >
> > Boyang
> >
> > On Thu, May 21, 2020 at 10:31 AM Boyang Chen  >
> > wrote:
> >
> >> Thanks David, I agree the wording here is not clear, and the fellow
> >> broker should just send a new CreateTopicRequest in this case.
> >>
> >> In the meantime, we had some offline discussion about the Envelope API
> as
> >> well. Although it provides certain privileges like data embedding and
> >> principal embedding, it creates a security hole by letting a malicious
> user
> >> impersonate any forwarding broker, thus pretending to be any admin user.
> >> Passing the principal around also increases the vulnerability, compared
> >> with other standard ways such as passing a verified token, but it is
> >> unfortunately not fully supported with Kafka security.
> >>
> >> So for the security concerns, we are abandoning the Envelope approach
> and
> >> fallback to just forward the raw admin requests. The authentication will
> >> happen on the receiving broker side instead of on the controller, so
> that
> >> we are able to stripped off the principal fields and only include the
> >> principal in header as optional field for audit logging purpose.
> >> Furthermore, we shall propose adding a separate endpoint for
> >> broker-controller communication which should be recommended to enable
> >> secure connections so that a malicious client could not pretend to be a
> >> broker and perform impersonation.
> >>
> >> Let me know your thoughts.
> >>
> >> Best,
> >> Boyang
> >>
> >> On Tue, May 19, 2020 at 12:17 AM David Jacot 
> wrote:
> >>
> >>> Hi Boyang,
> >>>
> >>> I've got another question regarding the auto topic creation case. The
> KIP
> >>> says: "Currently the target broker shall just utilize its own ZK client
> >>> to
> >>> create
> >>> internal topics, which is disallowed in the bridge release. For above
> >>> scenarios,
> >>> non-controller broker shall just forward a CreateTopicRequest to the
> >>> controller
> >>> instead and let controller take care of the rest, while waiting for the
> >>> response
> >>> in the meantime." There will be no request to forward in this case,
> >>> right?
> >>> Instead,
> >>> a CreateTopicsRequest is created and sent to the controller node.
> >>>
> >>> When the CreateTopicsRequest is sent as a side effect of the
> >>> MetadataRequest,
> >>> it would be good to know the principal and the clientId in the
> controller
> >>> (quota,
> >>> audit, etc.). Do you plan to use the Envelope API for this case as well
> >>> or
> >>> to call
> >>> the regular API directly? Another was to phrase it would be: Shall the
> >>> internal
> >>> CreateTopicsRequest be sent with the original metadata (principal,
> >>> clientId, etc.)
> >>> of the MetadataRequest or as an admin request?
> >>>
> >>> Best,
> >>> David
> >>>
> >>> On Fri, May 8, 2020 at 2:04 AM Guozhang Wang 
> wrote:
> >>>
> >>> > Just to adds a bit more FYI here related to the last question from
> >>> David:
> >>> > in KIP-595 while implementing the new requests we are also adding a
> >>> > "KafkaNetworkChannel" which is used for brokers to send vote / fetch
> >>> > records, so besides the discussion on listeners I think
> implementation
> >>> wise
> >>> > we can also consider consolidating a lot of those into the same
> >>> call-trace
> >>> > as well -- of course this is not related to public APIs so maybe just
> >>> needs
> >>> > to be coordinated among developments:
> >>> >
> >>> > 1. Broker -> Controller: ISR Change, Topic Creation, Admin Redirect
> >>> > (KIP-497).
> >>> > 2. Controller -> Broker: LeaderAndISR / MetadataUpdate; though these
> >>> are
> >>> > likely going to be deprecated post KIP-500.
> >>> > 3. Txn Coordinator -> Broker: TxnMarker
> >>> >
> >>> >
> >>> > Guozhang
> >>> >
> >>> > On Wed, May 6, 2020 at 8:58 PM Boyang Chen <
> reluctanthero...@gmail.com
> >>> >
> >>> > wrote:
> >>> >
> >>> > > Hey David,
> >>> > >
> >>> > > thanks for the feedbacks!
> >>> > >
> >>> > > On Wed, May 6, 2020 at 2:06 AM David Jacot 
> >>> wrote:
> >>> > >
> >>> > > > Hi Boyang,
> >>> > >

[jira] [Created] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-09 Thread Sean Guo (Jira)
Sean Guo created KAFKA-10134:


 Summary: High CPU issue during rebalance in Kafka consumer after 
upgrading to 2.5
 Key: KAFKA-10134
 URL: https://issues.apache.org/jira/browse/KAFKA-10134
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.5.0
Reporter: Sean Guo


We want to utilize the new rebalance protocol to mitigate the stop-the-world 
effect during the rebalance as our tasks are long running task.

But after the upgrade when we try to kill an instance when there is some 
load(long running tasks >30S) there, the CPU will go sky-high. It reads ~700% 
in our metrics so it should several threads are in a tight loop.
{noformat}
"executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
[0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
at {noformat}
By debugging into the code we found it looks like the clients are  in a loop on 
finding the coordinator.

I also tried the old rebalance protocol for the new version the issue still 
exists but the CPU will be back to normal when the rebalance is done.

Also tried the same on the 2.4.1 which seems don't have this issue. So it seems 
related something changed between 2.4.1 and 2.5.0.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-06-09 Thread Steve Jacobs (Jira)
Steve Jacobs created KAFKA-10133:


 Summary: Cannot compress messages in destination cluster with MM2
 Key: KAFKA-10133
 URL: https://issues.apache.org/jira/browse/KAFKA-10133
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.4.1, 2.5.0, 2.4.0
 Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
Reporter: Steve Jacobs


When configuring mirrormaker2 using kafka connect, it is not possible to 
configure things such that messages are compressed in the destination cluster. 
Dump Log shows that batching is occuring, but no compression. If this is 
possible, then this is a documentation bug, because I can find no documentation 
on how to do this.

baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false 
isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 magic: 
2 compresscodec: NONE crc: 1811507259 isvalid: true

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-2.5-jdk8 #147

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[konstantine] KAFKA-9848: Avoid triggering scheduled rebalance delay when task


--
[...truncated 5.92 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STAR

Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread Colin McCabe
On Tue, Jun 9, 2020, at 05:06, David Jacot wrote:
> Hi Colin,
> 
> Thank you for your feedback.
> 
> Jun has summarized the situation pretty well. Thanks Jun! I would like to
> complement it with the following points:
> 
> 1. Indeed, when the quota is exceeded, the broker will reject the topic
> creations, partition creations and topics deletions that are exceeding
> with the new QUOTA_VIOLATED error. The ThrottleTimeMs field will
> be populated accordingly to let the client know how long it must wait.
> 
> 2. I do agree that we actually want a mechanism to apply back pressure
> to the clients. The KIP basically proposes a mechanism to control and to
> limit the rate of operations before entering the controller. I think that
> it is similar to your thinking but is enforced based on a defined
> instead of relying on the number of pending items in the controller.
> 
> 3. You mentioned an alternative idea in your comments that, if I understood
> correctly, would bound the queue to limit the overload and reject work if
> the queue is full. I have been thinking about this as well but I don't think
> that it  works well in our case.
> - The first reason is the one mentioned by Jun. We actually want to be able
> to limit specific clients (the misbehaving ones) in a multi-tenant
> environment.
> - The second reason is that, at least in our current implementation, the
> length of the queue is not really a good characteristic to estimate the load.
> Coming back to your example of the CreateTopicsRequest. They create path
>  in ZK for each newly created topics which trigger a ChangeTopic event in 
> the controller. That single event could be for a single topic in some cases or
> for a thousand topics in others.
> These two reasons aside, bounding the queue also introduces a knob which
> requires some tuning and thus suffers from all the points you mentioned as
> well, isn't it? The value may be true for one version but not for another.
> 
> 4. Regarding the integration with KIP-500. The implementation of this KIP
> will span across the ApiLayer and the AdminManager. To be honest, we
> can influence the implementation to work best with what you have in mind
> for the future controller. If you could shed some more light on the future
> internal architecture, I can take this into account during the
> implementation.
>

Hi David,

Good question.  The approach we are thinking of is that in the future, topic 
creation will be a controller RPC.  In other words, rather than changing ZK and 
waiting for the controller code to notice, we'll go through the controller code 
(which may change ZK, or may do something else in the ZK-free environment).

I don't think there is a good way to write this in the short term that avoids 
having to rewrite in the long term.  That's probably OK though, as long as we 
keep in mind that we'll need to.

> 
> 5. Regarding KIP-590. For the create topics request, create partitions
> request, and delete topics request, we are lucky enough to have directed
> all of them to the controller already so we are fine with doing the admission
> in the broker which hosts the controller. If we were to throttle more 
> operations
> in the future,
> I believe that we can continue to do it centrally while leveraging the
> principal that is forwarded to account for the right tenant. The reason 
> why I would like to keep it central is that we are talking about sparse (or 
> bursty)
> workloads here so distributing the quota among all the brokers makes little 
> sense.
> 

Right.  The main requirement here is that the quota must operate based on 
principal names rather than KafkaPrincipal objects.  We had a long discussion 
about KIP-590 and eventually concluded that we didn't want to make 
KafkaPrincipal serializable (at least not yet) so what would be forwarded is 
just a string, not the principal object.

>
> 6. Regarding the naming of the new error code. BUSY sounds too generic to
> me so I would rather prefer a specific error code. The main reason being
> that we may be able to reuse it in the future for other quotas. That being 
> said,
> we can find another name. QUOTA_EXHAUSTED? I don't feel too strongly about
> the naming. I wonder what the others think about this.
> 

Think about if you're someone writing software that uses the Kafka client.  
Let's say you try to create some topics and get back QUOTA_VIOLATED.  What does 
it mean?  Maybe it means that you tried to create too many topics in too short 
a time (violating the controller throttle).  Or maybe it means that you 
exceeded your quota specifying the number of partitions that they are allowed 
to create (let's assume that someone did a follow-on KIP for that that reuses 
this error code for that.)

But now you have a dilemma.  If the controller was just busy, then trying again 
is the right thing to do.  If there is some other quota you violated (number of 
partitions, number of topics, etc.) then retrying is wasteful and will not 
accomplish anyt

Build failed in Jenkins: kafka-trunk-jdk14 #203

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9716; Clarify meaning of compression rate metrics (#8664)

[github] KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie


--
[...truncated 3.15 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecord

[jira] [Created] (KAFKA-10132) Kafka Connect JMX MBeans with String values have type double

2020-06-09 Thread Tom Malaher (Jira)
Tom Malaher created KAFKA-10132:
---

 Summary: Kafka Connect JMX MBeans with String values have type 
double
 Key: KAFKA-10132
 URL: https://issues.apache.org/jira/browse/KAFKA-10132
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: Tom Malaher


There are quite a few metrics available for source/sink connectors, and many of 
them are numeric (JMX type "double"), but there are a few attributes that have 
string values that are still tagged as "double".

For example:

Bean: kafka.connect:connector=my-source,type=connector-metrics Attribute: status

The Attribute Description says: "The status of the connector task. One of 
'unassigned', 'running', 'paused', 'failed', or 'destroyed'."

The value is currently "running" on my instance.

This causes difficulty for anything that tries to introspect the JMX attribute 
metadata and then parse/display the data.

See also 
[https://stackoverflow.com/questions/50291157/which-jmx-metric-should-be-used-to-monitor-the-status-of-a-connector-in-kafka-co]
 where this problem is mentioned in one of the answers (dating back to 2018).

The attribute metadata should be updated to indicate the correct type.

I suspect the problem lies at line 220 of 
`org.apache.kafka.common.metrics.JmxReporter` (in version 2.5.0) where a 
hardcoded `double.class.getName()` is used as the mbean data type even for 
metrics with a type of String.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10131) Minimize use of --zookeeper flag in ducktape tests

2020-06-09 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10131:
--

 Summary: Minimize use of --zookeeper flag in ducktape tests
 Key: KAFKA-10131
 URL: https://issues.apache.org/jira/browse/KAFKA-10131
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar


Get the ducktape tests working without the --zookeeper flag (except for scram).

(Note: When doing compat testing we'll still use the old flags.)

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : kafka-2.6-jdk8 #31

2020-06-09 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #4627

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9716; Clarify meaning of compression rate metrics (#8664)


--
[...truncated 3.13 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task :streams:upgrade-system-tests-0100:test
> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0101:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0101:checkstyleMain NO-SOURCE
> Task :streams:upgrade-system-tests-0101:compileTestJava
> Task :streams:upgrade-system-tests-0101:processTestResources NO-SOURCE
> Task :streams:upgrade-system-te

Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread Jun Rao
Hi, David,

Sounds good then.

Thanks,

Jun

On Tue, Jun 9, 2020 at 10:59 AM David Jacot  wrote:

> Hi Jun,
>
> Both are already in the KIP, see "New Broker Configurations" chapter. I
> think
> that we need them in order to be able to define different burst for the new
> quota.
>
> Best,
> David
>
> On Tue, Jun 9, 2020 at 7:48 PM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Another thing. Should we add controller.quota.window.size.seconds and
> > controller.quota.window.num
> > or just reuse the existing quota.window.size.seconds and quota.window.num
> > that are used for other types of quotas?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Jun 9, 2020 at 10:30 AM Jun Rao  wrote:
> >
> > > Hi, David,
> > >
> > > Thanks for the KIP. The name QUOTA_VIOLATED sounds reasonable to me. +1
> > on
> > > the KIP.
> > >
> > > Jun
> > >
> > > On Tue, Jun 9, 2020 at 5:07 AM David Jacot 
> wrote:
> > >
> > >> Hi Colin,
> > >>
> > >> Thank you for your feedback.
> > >>
> > >> Jun has summarized the situation pretty well. Thanks Jun! I would like
> > to
> > >> complement it with the following points:
> > >>
> > >> 1. Indeed, when the quota is exceeded, the broker will reject the
> topic
> > >> creations, partition creations and topics deletions that are exceeding
> > >> with the new QUOTA_VIOLATED error. The ThrottleTimeMs field will
> > >> be populated accordingly to let the client know how long it must wait.
> > >>
> > >> 2. I do agree that we actually want a mechanism to apply back pressure
> > >> to the clients. The KIP basically proposes a mechanism to control and
> to
> > >> limit the rate of operations before entering the controller. I think
> > that
> > >> it is
> > >> similar to your thinking but is enforced based on a defined quota
> > instead
> > >> of relying on the number of pending items in the controller.
> > >>
> > >> 3. You mentioned an alternative idea in your comments that, if I
> > >> understood
> > >> correctly, would bound the queue to limit the overload and reject work
> > if
> > >> the
> > >> queue is full. I have been thinking about this as well but I don't
> think
> > >> that it
> > >> works well in our case.
> > >> - The first reason is the one mentioned by Jun. We actually want to be
> > >> able
> > >> to limit specific clients (the misbehaving ones) in a multi-tenant
> > >> environment.
> > >> - The second reason is that, at least in our current implementation,
> the
> > >> length of
> > >> the queue is not really a good characteristic to estimate the load.
> > >> Coming back
> > >> to your example of the CreateTopicsRequest. They create path in ZK for
> > >> each
> > >> newly created topics which trigger a ChangeTopic event in the
> > controller.
> > >> That
> > >> single event could be for a single topic in some cases or for a
> thousand
> > >> topics
> > >> in others.
> > >> These two reasons aside, bounding the queue also introduces a knob
> which
> > >> requires some tuning and thus suffers from all the points you
> mentioned
> > as
> > >> well, isn't it? The value may be true for one version but not for
> > another.
> > >>
> > >> 4. Regarding the integration with KIP-500. The implementation of this
> > KIP
> > >> will span across the ApiLayer and the AdminManager. To be honest, we
> > >> can influence the implementation to work best with what you have in
> mind
> > >> for the future controller. If you could shed some more light on the
> > future
> > >> internal architecture, I can take this into account during the
> > >> implementation.
> > >>
> > >> 5. Regarding KIP-590. For the create topics request, create partitions
> > >> request,
> > >> and delete topics request, we are lucky enough to have directed all of
> > >> them
> > >> to
> > >> the controller already so we are fine with doing the admission in the
> > >> broker
> > >> which hosts the controller. If we were to throttle more operations in
> > the
> > >> future,
> > >> I believe that we can continue to do it centrally while leveraging the
> > >> principal
> > >> that is forwarded to account for the right tenant. The reason why I
> > would
> > >> like
> > >> to keep it central is that we are talking about sparse (or bursty)
> > >> workloads here
> > >> so distributing the quota among all the brokers makes little sense.
> > >>
> > >> 6. Regarding the naming of the new error code. BUSY sounds too generic
> > to
> > >> me so I would rather prefer a specific error code. The main reason
> being
> > >> that
> > >> we may be able to reuse it in the future for other quotas. That being
> > >> said,
> > >> we
> > >> can find another name. QUOTA_EXHAUSTED? I don't feel too strongly
> about
> > >> the naming. I wonder what the others think about this.
> > >>
> > >> Voilà. I hope that I have addressed all your questions/points and I am
> > >> sorry for
> > >> the lengthy email.
> > >>
> > >> Regards,
> > >> David
> > >>
> > >>
> > >> On Tue, Jun 9, 2020 at 2:13 AM Colin McCabe 
> wrote:
> > >>
> > >> > On Mon, Jun 8, 2020, at 14:41, Jun Rao wrote:

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-09 Thread John Roesler
Hi Sagar,

Thanks for the reply. I agree that your UUID example illustrates the
problem I was pointing out.

Yes, I think that experimenting with the API in the PR is probably the
best way to make progress (as opposed to just thinking in terms of
design on the wiki) because with this kind of thing, the devil is often
in the details.

To clarify what I meant by that last statement, I see the core challenge
here as deriving from the fact that we have a key/value store with
generically typed keys, with a separate component (the serde) that
turns those typed keys into bytes for storage. In contrast, RocksDB 
can easily offer a "prefix scan" operation because they key type is
always just a byte array, so "prefix" is a very natural concept to offer
in the API. Other key/value stores force the keys to always be strings,
which also makes it easy to define a prefix scan.

My question is whether there are other databases that offer both:
1. generically typed keys (as opposed to just bytes, just strings, etc)
2. prefix scans
And, if so, what the API looks like.

Thanks,
-John

On Tue, Jun 9, 2020, at 11:51, Sagar wrote:
> Hi John,
> 
> Thanks for the response. For starters, for our use case, we tweaked our
> keys etc to avoid prefix scans. So, we are good there.
> 
> Regarding the KIP, I see what you mean when you say that the same key type
> for prefix won't work. For example, continuing with the UUID example that
> you gave, let's say one of the UUIDs
> is 123e4567-e89b-12d3-a456-426614174000, and with a prefix scan we want to
> fetch all keys starting with 123. There's already a UUIDSerde so if the
> keys have been stored with that, then using UUIDSerde for prefixes won't
> help- I am not sure if the UUIDSerializer would even work for 123.
> 
> So, that indicates that we will need to provide a new prefix key type
> serializer. Having said that, how it will be stitched together and finally
> exposed using the APIs is something that is up for questioning. This is
> something you have also brought up in the earlier emails. If it
> makes sense, I can modify my PR to go along these lines. Please let me know
> what you think.
> 
> Lastly, I didn't understand this line of yours: *It might help if there are
> other typed key/value stores to compare APIs with.*
> 
> Thanks!
> Sagar.
> 
> 
> On Thu, Jun 4, 2020 at 6:03 AM John Roesler  wrote:
> 
> > Hi Sagar,
> >
> > Thanks for the question, and sorry for muddying the water.
> >
> > I meant the Bytes/byte[] thing as advice for how you all can solve your
> > problem in the mean time, while we work through this KIP. I don’t think
> > it’s relevant for the KIP itself.
> >
> > I think the big issue here is what the type of the prefix should be in the
> > method signature. Using the same type as the key makes sense some times,
> > but not other times. I’m not sure what the best way around this might be.
> > It might help if there are other typed key/value stores to compare APIs
> > with.
> >
> > Thanks,
> > John
> >
> > On Mon, Jun 1, 2020, at 09:58, Sagar wrote:
> > > Hi John,
> > >
> > > Just to add to my previous email(and sorry for the spam), if we consider
> > > using Bytes/byte[] and manually invoke the serdes, if you could provide
> > > examples where the same Serde for keys won't work for the prefix types.
> > As
> > > far as my understanding goes, the prefix seek would depend upon ordering
> > of
> > > the keys like lexicographic. As long as the binary format is consistent
> > for
> > > both the keys and the prefixes would it not ensure the ability to search
> > in
> > > that same ordering space? This is from my limited understanding so any
> > > concrete examples would be helpful...
> > >
> > > Also, you mentioned about the creation of dummy values to indicate prefix
> > > values, do you mean this line:
> > >
> > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L91
> > > This
> > > is where the prefix key is built and used for searching .
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Mon, Jun 1, 2020 at 11:42 AM Sagar  wrote:
> > >
> > > > Hi John,
> > > >
> > > > Thank you. I think it makes sense to modify the KIP to add the
> > > > prefixScan() as part of the existing interfaces and add the new mixin
> > > > behaviour as Rejected alternatives. I am not very aware of other stores
> > > > apart from keyValueStore so is it fine if I keep it there for now?
> > > >
> > > > Regarding the type definition of types I will try and think about some
> > > > alternatives and share if I get any.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > >
> > > > On Sun, May 31, 2020 at 1:55 AM John Roesler 
> > wrote:
> > > >
> > > >> Hi Sagar,
> > > >>
> > > >> Thanks for the response. Your use case makes sense to me; I figured it
> > > >> must be something like that.
> > > >>
> > > >> On a pragmatic level, in the near term, you might consider basically
> > >

Build failed in Jenkins: kafka-trunk-jdk8 #4626

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9848: Avoid triggering scheduled rebalance delay when task


--
[...truncated 3.13 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > tes

Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread David Jacot
Hi Jun,

Both are already in the KIP, see "New Broker Configurations" chapter. I
think
that we need them in order to be able to define different burst for the new
quota.

Best,
David

On Tue, Jun 9, 2020 at 7:48 PM Jun Rao  wrote:

> Hi, David,
>
> Another thing. Should we add controller.quota.window.size.seconds and
> controller.quota.window.num
> or just reuse the existing quota.window.size.seconds and quota.window.num
> that are used for other types of quotas?
>
> Thanks,
>
> Jun
>
> On Tue, Jun 9, 2020 at 10:30 AM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Thanks for the KIP. The name QUOTA_VIOLATED sounds reasonable to me. +1
> on
> > the KIP.
> >
> > Jun
> >
> > On Tue, Jun 9, 2020 at 5:07 AM David Jacot  wrote:
> >
> >> Hi Colin,
> >>
> >> Thank you for your feedback.
> >>
> >> Jun has summarized the situation pretty well. Thanks Jun! I would like
> to
> >> complement it with the following points:
> >>
> >> 1. Indeed, when the quota is exceeded, the broker will reject the topic
> >> creations, partition creations and topics deletions that are exceeding
> >> with the new QUOTA_VIOLATED error. The ThrottleTimeMs field will
> >> be populated accordingly to let the client know how long it must wait.
> >>
> >> 2. I do agree that we actually want a mechanism to apply back pressure
> >> to the clients. The KIP basically proposes a mechanism to control and to
> >> limit the rate of operations before entering the controller. I think
> that
> >> it is
> >> similar to your thinking but is enforced based on a defined quota
> instead
> >> of relying on the number of pending items in the controller.
> >>
> >> 3. You mentioned an alternative idea in your comments that, if I
> >> understood
> >> correctly, would bound the queue to limit the overload and reject work
> if
> >> the
> >> queue is full. I have been thinking about this as well but I don't think
> >> that it
> >> works well in our case.
> >> - The first reason is the one mentioned by Jun. We actually want to be
> >> able
> >> to limit specific clients (the misbehaving ones) in a multi-tenant
> >> environment.
> >> - The second reason is that, at least in our current implementation, the
> >> length of
> >> the queue is not really a good characteristic to estimate the load.
> >> Coming back
> >> to your example of the CreateTopicsRequest. They create path in ZK for
> >> each
> >> newly created topics which trigger a ChangeTopic event in the
> controller.
> >> That
> >> single event could be for a single topic in some cases or for a thousand
> >> topics
> >> in others.
> >> These two reasons aside, bounding the queue also introduces a knob which
> >> requires some tuning and thus suffers from all the points you mentioned
> as
> >> well, isn't it? The value may be true for one version but not for
> another.
> >>
> >> 4. Regarding the integration with KIP-500. The implementation of this
> KIP
> >> will span across the ApiLayer and the AdminManager. To be honest, we
> >> can influence the implementation to work best with what you have in mind
> >> for the future controller. If you could shed some more light on the
> future
> >> internal architecture, I can take this into account during the
> >> implementation.
> >>
> >> 5. Regarding KIP-590. For the create topics request, create partitions
> >> request,
> >> and delete topics request, we are lucky enough to have directed all of
> >> them
> >> to
> >> the controller already so we are fine with doing the admission in the
> >> broker
> >> which hosts the controller. If we were to throttle more operations in
> the
> >> future,
> >> I believe that we can continue to do it centrally while leveraging the
> >> principal
> >> that is forwarded to account for the right tenant. The reason why I
> would
> >> like
> >> to keep it central is that we are talking about sparse (or bursty)
> >> workloads here
> >> so distributing the quota among all the brokers makes little sense.
> >>
> >> 6. Regarding the naming of the new error code. BUSY sounds too generic
> to
> >> me so I would rather prefer a specific error code. The main reason being
> >> that
> >> we may be able to reuse it in the future for other quotas. That being
> >> said,
> >> we
> >> can find another name. QUOTA_EXHAUSTED? I don't feel too strongly about
> >> the naming. I wonder what the others think about this.
> >>
> >> Voilà. I hope that I have addressed all your questions/points and I am
> >> sorry for
> >> the lengthy email.
> >>
> >> Regards,
> >> David
> >>
> >>
> >> On Tue, Jun 9, 2020 at 2:13 AM Colin McCabe  wrote:
> >>
> >> > On Mon, Jun 8, 2020, at 14:41, Jun Rao wrote:
> >> > > Hi, Colin,
> >> > >
> >> > > Thanks for the comment. You brought up several points.
> >> > >
> >> > > 1. Should we set up a per user quota? To me, it does seem we need
> some
> >> > sort
> >> > > of a quota. When the controller runs out of resources, ideally, we
> >> only
> >> > > want to penalize the bad behaving applications, instead of every
> >> > > application. To do that, 

Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread Jun Rao
Hi, David,

Another thing. Should we add controller.quota.window.size.seconds and
controller.quota.window.num
or just reuse the existing quota.window.size.seconds and quota.window.num
that are used for other types of quotas?

Thanks,

Jun

On Tue, Jun 9, 2020 at 10:30 AM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the KIP. The name QUOTA_VIOLATED sounds reasonable to me. +1 on
> the KIP.
>
> Jun
>
> On Tue, Jun 9, 2020 at 5:07 AM David Jacot  wrote:
>
>> Hi Colin,
>>
>> Thank you for your feedback.
>>
>> Jun has summarized the situation pretty well. Thanks Jun! I would like to
>> complement it with the following points:
>>
>> 1. Indeed, when the quota is exceeded, the broker will reject the topic
>> creations, partition creations and topics deletions that are exceeding
>> with the new QUOTA_VIOLATED error. The ThrottleTimeMs field will
>> be populated accordingly to let the client know how long it must wait.
>>
>> 2. I do agree that we actually want a mechanism to apply back pressure
>> to the clients. The KIP basically proposes a mechanism to control and to
>> limit the rate of operations before entering the controller. I think that
>> it is
>> similar to your thinking but is enforced based on a defined quota instead
>> of relying on the number of pending items in the controller.
>>
>> 3. You mentioned an alternative idea in your comments that, if I
>> understood
>> correctly, would bound the queue to limit the overload and reject work if
>> the
>> queue is full. I have been thinking about this as well but I don't think
>> that it
>> works well in our case.
>> - The first reason is the one mentioned by Jun. We actually want to be
>> able
>> to limit specific clients (the misbehaving ones) in a multi-tenant
>> environment.
>> - The second reason is that, at least in our current implementation, the
>> length of
>> the queue is not really a good characteristic to estimate the load.
>> Coming back
>> to your example of the CreateTopicsRequest. They create path in ZK for
>> each
>> newly created topics which trigger a ChangeTopic event in the controller.
>> That
>> single event could be for a single topic in some cases or for a thousand
>> topics
>> in others.
>> These two reasons aside, bounding the queue also introduces a knob which
>> requires some tuning and thus suffers from all the points you mentioned as
>> well, isn't it? The value may be true for one version but not for another.
>>
>> 4. Regarding the integration with KIP-500. The implementation of this KIP
>> will span across the ApiLayer and the AdminManager. To be honest, we
>> can influence the implementation to work best with what you have in mind
>> for the future controller. If you could shed some more light on the future
>> internal architecture, I can take this into account during the
>> implementation.
>>
>> 5. Regarding KIP-590. For the create topics request, create partitions
>> request,
>> and delete topics request, we are lucky enough to have directed all of
>> them
>> to
>> the controller already so we are fine with doing the admission in the
>> broker
>> which hosts the controller. If we were to throttle more operations in the
>> future,
>> I believe that we can continue to do it centrally while leveraging the
>> principal
>> that is forwarded to account for the right tenant. The reason why I would
>> like
>> to keep it central is that we are talking about sparse (or bursty)
>> workloads here
>> so distributing the quota among all the brokers makes little sense.
>>
>> 6. Regarding the naming of the new error code. BUSY sounds too generic to
>> me so I would rather prefer a specific error code. The main reason being
>> that
>> we may be able to reuse it in the future for other quotas. That being
>> said,
>> we
>> can find another name. QUOTA_EXHAUSTED? I don't feel too strongly about
>> the naming. I wonder what the others think about this.
>>
>> Voilà. I hope that I have addressed all your questions/points and I am
>> sorry for
>> the lengthy email.
>>
>> Regards,
>> David
>>
>>
>> On Tue, Jun 9, 2020 at 2:13 AM Colin McCabe  wrote:
>>
>> > On Mon, Jun 8, 2020, at 14:41, Jun Rao wrote:
>> > > Hi, Colin,
>> > >
>> > > Thanks for the comment. You brought up several points.
>> > >
>> > > 1. Should we set up a per user quota? To me, it does seem we need some
>> > sort
>> > > of a quota. When the controller runs out of resources, ideally, we
>> only
>> > > want to penalize the bad behaving applications, instead of every
>> > > application. To do that, we will need to know what each application is
>> > > entitled to and the per user quota is intended to capture that.
>> > >
>> > > 2. How easy is it to configure a quota? The following is how an admin
>> > > typically sets up a quota in our existing systems. Pick a generous
>> > default
>> > > per user quota works for most applications. For the few resource
>> > intensive
>> > > applications, customize a higher quota for them. Reserve enough
>> resources
>> > > in anticipation that a single (

[jira] [Created] (KAFKA-10130) Rewrite ZkData struct with auto-generated protocol

2020-06-09 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10130:
---

 Summary: Rewrite ZkData struct with auto-generated protocol
 Key: KAFKA-10130
 URL: https://issues.apache.org/jira/browse/KAFKA-10130
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


The ZkData.scala includes a couple of data structs with versions, such as 
BrokerIdZNode. Human effort to evolve a JSON struct is error-prone, compared 
with our RPC automated framework. The benefit of this re-write outweighs the 
trouble IMHO.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread Jun Rao
Hi, David,

Thanks for the KIP. The name QUOTA_VIOLATED sounds reasonable to me. +1 on
the KIP.

Jun

On Tue, Jun 9, 2020 at 5:07 AM David Jacot  wrote:

> Hi Colin,
>
> Thank you for your feedback.
>
> Jun has summarized the situation pretty well. Thanks Jun! I would like to
> complement it with the following points:
>
> 1. Indeed, when the quota is exceeded, the broker will reject the topic
> creations, partition creations and topics deletions that are exceeding
> with the new QUOTA_VIOLATED error. The ThrottleTimeMs field will
> be populated accordingly to let the client know how long it must wait.
>
> 2. I do agree that we actually want a mechanism to apply back pressure
> to the clients. The KIP basically proposes a mechanism to control and to
> limit the rate of operations before entering the controller. I think that
> it is
> similar to your thinking but is enforced based on a defined quota instead
> of relying on the number of pending items in the controller.
>
> 3. You mentioned an alternative idea in your comments that, if I understood
> correctly, would bound the queue to limit the overload and reject work if
> the
> queue is full. I have been thinking about this as well but I don't think
> that it
> works well in our case.
> - The first reason is the one mentioned by Jun. We actually want to be able
> to limit specific clients (the misbehaving ones) in a multi-tenant
> environment.
> - The second reason is that, at least in our current implementation, the
> length of
> the queue is not really a good characteristic to estimate the load.
> Coming back
> to your example of the CreateTopicsRequest. They create path in ZK for each
> newly created topics which trigger a ChangeTopic event in the controller.
> That
> single event could be for a single topic in some cases or for a thousand
> topics
> in others.
> These two reasons aside, bounding the queue also introduces a knob which
> requires some tuning and thus suffers from all the points you mentioned as
> well, isn't it? The value may be true for one version but not for another.
>
> 4. Regarding the integration with KIP-500. The implementation of this KIP
> will span across the ApiLayer and the AdminManager. To be honest, we
> can influence the implementation to work best with what you have in mind
> for the future controller. If you could shed some more light on the future
> internal architecture, I can take this into account during the
> implementation.
>
> 5. Regarding KIP-590. For the create topics request, create partitions
> request,
> and delete topics request, we are lucky enough to have directed all of them
> to
> the controller already so we are fine with doing the admission in the
> broker
> which hosts the controller. If we were to throttle more operations in the
> future,
> I believe that we can continue to do it centrally while leveraging the
> principal
> that is forwarded to account for the right tenant. The reason why I would
> like
> to keep it central is that we are talking about sparse (or bursty)
> workloads here
> so distributing the quota among all the brokers makes little sense.
>
> 6. Regarding the naming of the new error code. BUSY sounds too generic to
> me so I would rather prefer a specific error code. The main reason being
> that
> we may be able to reuse it in the future for other quotas. That being said,
> we
> can find another name. QUOTA_EXHAUSTED? I don't feel too strongly about
> the naming. I wonder what the others think about this.
>
> Voilà. I hope that I have addressed all your questions/points and I am
> sorry for
> the lengthy email.
>
> Regards,
> David
>
>
> On Tue, Jun 9, 2020 at 2:13 AM Colin McCabe  wrote:
>
> > On Mon, Jun 8, 2020, at 14:41, Jun Rao wrote:
> > > Hi, Colin,
> > >
> > > Thanks for the comment. You brought up several points.
> > >
> > > 1. Should we set up a per user quota? To me, it does seem we need some
> > sort
> > > of a quota. When the controller runs out of resources, ideally, we only
> > > want to penalize the bad behaving applications, instead of every
> > > application. To do that, we will need to know what each application is
> > > entitled to and the per user quota is intended to capture that.
> > >
> > > 2. How easy is it to configure a quota? The following is how an admin
> > > typically sets up a quota in our existing systems. Pick a generous
> > default
> > > per user quota works for most applications. For the few resource
> > intensive
> > > applications, customize a higher quota for them. Reserve enough
> resources
> > > in anticipation that a single (or a few) application will exceed the
> > quota
> > > at a given time.
> > >
> >
> > Hi Jun,
> >
> > Thanks for the response.
> >
> > Maybe I was too pessimistic about the ability of admins to configure a
> > useful quota here.  I do agree that it would be nice to have the ability
> to
> > set different quotas for different users, as you mentioned.
> >
> > >
> > > 3. How should the quota be defined? In the discussion thr

Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread Jun Rao
Hi, David,

Thanks for the explanation. The KIP looks good to me now.

Jun

On Tue, Jun 9, 2020 at 4:27 AM David Jacot  wrote:

> Hi Jun,
>
> 40. Yes, ThrottleTimeMs is set when the error code is set to QuotaViolated.
> This
> is required to let the client know how long it must wait. This is explained
> in the
> "Handling of new/old clients".
>
> Best,
> David
>
> On Mon, Jun 8, 2020 at 9:29 PM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Thanks for the updated KIP. Another minor comment below.
> >
> > 40. For the new `QUOTA_VIOLATED` error in the response to
> > CreateTopics/CreatePartitions/DeleteTopics, could you clarify
> > whether ThrottleTimeMs is set when the error code is set to
> QUOTA_VIOLATED?
> >
> > Jun
> >
> > On Mon, Jun 8, 2020 at 9:32 AM David Jacot  wrote:
> >
> > > Hi Jun,
> > >
> > > 30. The rate is accumulated at the partition level. Let me clarify this
> > in
> > > the KIP.
> > >
> > > Best,
> > > David
> > >
> > > On Sat, Jun 6, 2020 at 2:37 AM Anna Povzner  wrote:
> > >
> > > > Hi David,
> > > >
> > > > The KIP looks good to me. I am going to the voting thread...
> > > >
> > > > Hi Jun,
> > > >
> > > > Yes, exactly. That's a separate thing from this KIP, so working on
> the
> > > fix.
> > > >
> > > > Thanks,
> > > > Anna
> > > >
> > > > On Fri, Jun 5, 2020 at 4:36 PM Jun Rao  wrote:
> > > >
> > > > > Hi, Anna,
> > > > >
> > > > > Thanks for the comment. For the problem that you described, perhaps
> > we
> > > > need
> > > > > to make the quota checking and recording more atomic?
> > > > >
> > > > > Hi, David,
> > > > >
> > > > > Thanks for the updated KIP.  Looks good to me now. Just one minor
> > > comment
> > > > > below.
> > > > >
> > > > > 30. controller_mutations_rate: For topic creation and deletion, is
> > the
> > > > rate
> > > > > accumulated at the topic or partition level? It would be useful to
> > make
> > > > it
> > > > > clear in the wiki.
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Jun 5, 2020 at 7:23 AM David Jacot 
> > > wrote:
> > > > >
> > > > > > Hi Anna and Jun,
> > > > > >
> > > > > > You are right. We should allocate up to the quota for each old
> > > sample.
> > > > > >
> > > > > > I have revamped the Throttling Algorithm section to better
> explain
> > > our
> > > > > > thought process and the token bucket inspiration.
> > > > > >
> > > > > > I have also added a chapter with few guidelines about how to
> define
> > > > > > the quota. There is no magic formula for this but I give few
> > > insights.
> > > > > > I don't have specific numbers that can be used out of the box so
> I
> > > > > > think that it is better to not put any for the time being. We can
> > > > always
> > > > > > complement later on in the documentation.
> > > > > >
> > > > > > Please, take a look and let me know what you think.
> > > > > >
> > > > > > Cheers,
> > > > > > David
> > > > > >
> > > > > > On Fri, Jun 5, 2020 at 8:37 AM Anna Povzner 
> > > wrote:
> > > > > >
> > > > > > > Hi David and Jun,
> > > > > > >
> > > > > > > I dug a bit deeper into the Rate implementation, and wanted to
> > > > confirm
> > > > > > that
> > > > > > > I do believe that the token bucket behavior is better for the
> > > reasons
> > > > > we
> > > > > > > already discussed but wanted to summarize. The main difference
> > > > between
> > > > > > Rate
> > > > > > > and token bucket is that the Rate implementation allows a burst
> > by
> > > > > > > borrowing from the future, whereas a token bucket allows a
> burst
> > by
> > > > > using
> > > > > > > accumulated tokens from the previous idle period. Using
> > accumulated
> > > > > > tokens
> > > > > > > smoothes out the rate measurement in general. Configuring a
> large
> > > > burst
> > > > > > > requires configuring a large quota window, which causes long
> > delays
> > > > for
> > > > > > > bursty workload, due to borrowing credits from the future.
> > Perhaps
> > > it
> > > > > is
> > > > > > > useful to add a summary in the beginning of the Throttling
> > > Algorithm
> > > > > > > section?
> > > > > > >
> > > > > > > In my previous email, I mentioned the issue we observed with
> the
> > > > > > bandwidth
> > > > > > > quota, where a low quota (1MB/s per broker) was limiting
> > bandwidth
> > > > > > visibly
> > > > > > > below the quota. I thought it was strictly the issue with the
> > Rate
> > > > > > > implementation as well, but I found a root cause to be
> different
> > > but
> > > > > > > amplified by the Rate implementation (long throttle delays of
> > > > requests
> > > > > > in a
> > > > > > > burst). I will describe it here for completeness using the
> > > following
> > > > > > > example:
> > > > > > >
> > > > > > >-
> > > > > > >
> > > > > > >Quota = 1MB/s, default window size and number of samples
> > > > > > >-
> > > > > > >
> > > > > > >Suppose there are 6 connections (maximum 6 outstanding
> > > requests),
> > > > > and
> > > > > > >each produce request is 5MB. If all requests arrive in a
> > burst,
> > > > the
> > > 

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-06-09 Thread Sagar
Hi John,

Thanks for the response. For starters, for our use case, we tweaked our
keys etc to avoid prefix scans. So, we are good there.

Regarding the KIP, I see what you mean when you say that the same key type
for prefix won't work. For example, continuing with the UUID example that
you gave, let's say one of the UUIDs
is 123e4567-e89b-12d3-a456-426614174000, and with a prefix scan we want to
fetch all keys starting with 123. There's already a UUIDSerde so if the
keys have been stored with that, then using UUIDSerde for prefixes won't
help- I am not sure if the UUIDSerializer would even work for 123.

So, that indicates that we will need to provide a new prefix key type
serializer. Having said that, how it will be stitched together and finally
exposed using the APIs is something that is up for questioning. This is
something you have also brought up in the earlier emails. If it
makes sense, I can modify my PR to go along these lines. Please let me know
what you think.

Lastly, I didn't understand this line of yours: *It might help if there are
other typed key/value stores to compare APIs with.*

Thanks!
Sagar.


On Thu, Jun 4, 2020 at 6:03 AM John Roesler  wrote:

> Hi Sagar,
>
> Thanks for the question, and sorry for muddying the water.
>
> I meant the Bytes/byte[] thing as advice for how you all can solve your
> problem in the mean time, while we work through this KIP. I don’t think
> it’s relevant for the KIP itself.
>
> I think the big issue here is what the type of the prefix should be in the
> method signature. Using the same type as the key makes sense some times,
> but not other times. I’m not sure what the best way around this might be.
> It might help if there are other typed key/value stores to compare APIs
> with.
>
> Thanks,
> John
>
> On Mon, Jun 1, 2020, at 09:58, Sagar wrote:
> > Hi John,
> >
> > Just to add to my previous email(and sorry for the spam), if we consider
> > using Bytes/byte[] and manually invoke the serdes, if you could provide
> > examples where the same Serde for keys won't work for the prefix types.
> As
> > far as my understanding goes, the prefix seek would depend upon ordering
> of
> > the keys like lexicographic. As long as the binary format is consistent
> for
> > both the keys and the prefixes would it not ensure the ability to search
> in
> > that same ordering space? This is from my limited understanding so any
> > concrete examples would be helpful...
> >
> > Also, you mentioned about the creation of dummy values to indicate prefix
> > values, do you mean this line:
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L91
> > This
> > is where the prefix key is built and used for searching .
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Jun 1, 2020 at 11:42 AM Sagar  wrote:
> >
> > > Hi John,
> > >
> > > Thank you. I think it makes sense to modify the KIP to add the
> > > prefixScan() as part of the existing interfaces and add the new mixin
> > > behaviour as Rejected alternatives. I am not very aware of other stores
> > > apart from keyValueStore so is it fine if I keep it there for now?
> > >
> > > Regarding the type definition of types I will try and think about some
> > > alternatives and share if I get any.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Sun, May 31, 2020 at 1:55 AM John Roesler 
> wrote:
> > >
> > >> Hi Sagar,
> > >>
> > >> Thanks for the response. Your use case makes sense to me; I figured it
> > >> must be something like that.
> > >>
> > >> On a pragmatic level, in the near term, you might consider basically
> > >> doing the same thing we did in KIP-213. If you swap out the store
> types for
> > >> Byte/byte[] and “manually” invoke the serdes in your own logic, then
> you
> > >> can use the same algorithm we did to derive the range scan boundaries
> from
> > >> your desired prefix.
> > >>
> > >> For the actual KIP, it seems like we would need significant design
> > >> improvements to be able to do any mixins, so I think we should favor
> > >> proposing either to just add to the existing interfaces or to create
> brand
> > >> new interfaces, as appropriate, for now. Given that prefix can be
> converted
> > >> to a range query at a low level, I think we can probably explore
> adding
> > >> prefix to the existing interfaces with a default implementation.
> > >>
> > >> It seems like that just leaves the question of how to define the type
> of
> > >> the prefix. To be honest, I don’t have any great ideas here. Are you
> able
> > >> to generate some creative solutions, Sagar?
> > >>
> > >> Thanks,
> > >> John
> > >>
> > >> On Tue, May 26, 2020, at 06:42, Sagar wrote:
> > >> > Hi John,
> > >> >
> > >> > Thanks for the detailed reply. I was a bit crammed with work last
> week
> > >> so
> > >> > couldn't respond earlier so apologies for that.
> > >> >
> > >> > First of all, thanks for the context that both you and Adam 

Build failed in Jenkins: kafka-2.6-jdk8 #30

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[bill] MINOR: fix HTML markup (#8823)


--
[...truncated 3.13 MB...]
org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairsWithCustomTimestampAndIncrementsAndNotAdvanceTime
 STARTED

org.apache.ka

[GitHub] [kafka-site] mjsax commented on pull request #268: MINOR: Make contact.html more clear

2020-06-09 Thread GitBox


mjsax commented on pull request #268:
URL: https://github.com/apache/kafka-site/pull/268#issuecomment-640962104


   Thanks for the PR!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka-site] mjsax merged pull request #268: MINOR: Make contact.html more clear

2020-06-09 Thread GitBox


mjsax merged pull request #268:
URL: https://github.com/apache/kafka-site/pull/268


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka-site] scott-confluent opened a new pull request #269: Kafka nav and hompeage redesign

2020-06-09 Thread GitBox


scott-confluent opened a new pull request #269:
URL: https://github.com/apache/kafka-site/pull/269


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-06-09 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-9923.

Resolution: Not A Problem

[~cadonna] pointed out that this actually ins't a problem/has been fixed by 
KAFKA-5804. We could certainly stand to do some cleaning up around the 
duplicates handling but at least we aren't losing data!

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10129) Fail the QA if there is javadoc error

2020-06-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10129:
--

 Summary: Fail the QA if there is javadoc error
 Key: KAFKA-10129
 URL: https://issues.apache.org/jira/browse/KAFKA-10129
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


from [~hachikuji] 
(https://github.com/apache/kafka/pull/8660#pullrequestreview-425856179)

{quote}
One other question I had is whether we should consider making doc failures also 
fail the build?
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : kafka-trunk-jdk11 #1554

2020-06-09 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk8 #4625

2020-06-09 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"

2020-06-09 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-9724.
-
Fix Version/s: 2.6.0
 Assignee: David Arthur
   Resolution: Fixed

> Consumer wrongly ignores fetched records "since it no longer has valid 
> position"
> 
>
> Key: KAFKA-9724
> URL: https://issues.apache.org/jira/browse/KAFKA-9724
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.4.0
>Reporter: Oleg Muravskiy
>Assignee: David Arthur
>Priority: Major
> Fix For: 2.6.0
>
> Attachments: consumer.log.xz
>
>
> After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) 
> consumers in a consumer group intermittently stop progressing on assigned 
> partitions, even when there are messages to consume. This is not a permanent 
> condition, as they progress from time to time, but become slower with time, 
> and catch up after restart.
> Here is a sample of 3 consecutive ignored fetches:
> {noformat}
> 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = 
> 538065631, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=16380)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position 
> FetchPosition{offset=538065584, offsetEpoch=Optional[62], 
> currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), 
> epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
> implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 
> (id: 3 rack: null)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = 
> 538065727, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=51864)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> {noformat}
> After which consumer makes progress:
> {noformat}
> 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the

Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread David Jacot
Hi Colin,

Thank you for your feedback.

Jun has summarized the situation pretty well. Thanks Jun! I would like to
complement it with the following points:

1. Indeed, when the quota is exceeded, the broker will reject the topic
creations, partition creations and topics deletions that are exceeding
with the new QUOTA_VIOLATED error. The ThrottleTimeMs field will
be populated accordingly to let the client know how long it must wait.

2. I do agree that we actually want a mechanism to apply back pressure
to the clients. The KIP basically proposes a mechanism to control and to
limit the rate of operations before entering the controller. I think that
it is
similar to your thinking but is enforced based on a defined quota instead
of relying on the number of pending items in the controller.

3. You mentioned an alternative idea in your comments that, if I understood
correctly, would bound the queue to limit the overload and reject work if
the
queue is full. I have been thinking about this as well but I don't think
that it
works well in our case.
- The first reason is the one mentioned by Jun. We actually want to be able
to limit specific clients (the misbehaving ones) in a multi-tenant
environment.
- The second reason is that, at least in our current implementation, the
length of
the queue is not really a good characteristic to estimate the load.
Coming back
to your example of the CreateTopicsRequest. They create path in ZK for each
newly created topics which trigger a ChangeTopic event in the controller.
That
single event could be for a single topic in some cases or for a thousand
topics
in others.
These two reasons aside, bounding the queue also introduces a knob which
requires some tuning and thus suffers from all the points you mentioned as
well, isn't it? The value may be true for one version but not for another.

4. Regarding the integration with KIP-500. The implementation of this KIP
will span across the ApiLayer and the AdminManager. To be honest, we
can influence the implementation to work best with what you have in mind
for the future controller. If you could shed some more light on the future
internal architecture, I can take this into account during the
implementation.

5. Regarding KIP-590. For the create topics request, create partitions
request,
and delete topics request, we are lucky enough to have directed all of them
to
the controller already so we are fine with doing the admission in the broker
which hosts the controller. If we were to throttle more operations in the
future,
I believe that we can continue to do it centrally while leveraging the
principal
that is forwarded to account for the right tenant. The reason why I would
like
to keep it central is that we are talking about sparse (or bursty)
workloads here
so distributing the quota among all the brokers makes little sense.

6. Regarding the naming of the new error code. BUSY sounds too generic to
me so I would rather prefer a specific error code. The main reason being
that
we may be able to reuse it in the future for other quotas. That being said,
we
can find another name. QUOTA_EXHAUSTED? I don't feel too strongly about
the naming. I wonder what the others think about this.

Voilà. I hope that I have addressed all your questions/points and I am
sorry for
the lengthy email.

Regards,
David


On Tue, Jun 9, 2020 at 2:13 AM Colin McCabe  wrote:

> On Mon, Jun 8, 2020, at 14:41, Jun Rao wrote:
> > Hi, Colin,
> >
> > Thanks for the comment. You brought up several points.
> >
> > 1. Should we set up a per user quota? To me, it does seem we need some
> sort
> > of a quota. When the controller runs out of resources, ideally, we only
> > want to penalize the bad behaving applications, instead of every
> > application. To do that, we will need to know what each application is
> > entitled to and the per user quota is intended to capture that.
> >
> > 2. How easy is it to configure a quota? The following is how an admin
> > typically sets up a quota in our existing systems. Pick a generous
> default
> > per user quota works for most applications. For the few resource
> intensive
> > applications, customize a higher quota for them. Reserve enough resources
> > in anticipation that a single (or a few) application will exceed the
> quota
> > at a given time.
> >
>
> Hi Jun,
>
> Thanks for the response.
>
> Maybe I was too pessimistic about the ability of admins to configure a
> useful quota here.  I do agree that it would be nice to have the ability to
> set different quotas for different users, as you mentioned.
>
> >
> > 3. How should the quota be defined? In the discussion thread, we debated
> > between a usage based model vs a rate based model. Dave and Anna argued
> for
> > the rate based model mostly because it's simpler to implement.
> >
>
> I'm trying to think more about how this integrates with our plans for
> KIP-500.  When we get rid of ZK, we will have to handle this in the
> controller itself, rather than in the AdminManager.  That impl

Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-09 Thread David Jacot
Hi Jun,

40. Yes, ThrottleTimeMs is set when the error code is set to QuotaViolated.
This
is required to let the client know how long it must wait. This is explained
in the
"Handling of new/old clients".

Best,
David

On Mon, Jun 8, 2020 at 9:29 PM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the updated KIP. Another minor comment below.
>
> 40. For the new `QUOTA_VIOLATED` error in the response to
> CreateTopics/CreatePartitions/DeleteTopics, could you clarify
> whether ThrottleTimeMs is set when the error code is set to QUOTA_VIOLATED?
>
> Jun
>
> On Mon, Jun 8, 2020 at 9:32 AM David Jacot  wrote:
>
> > Hi Jun,
> >
> > 30. The rate is accumulated at the partition level. Let me clarify this
> in
> > the KIP.
> >
> > Best,
> > David
> >
> > On Sat, Jun 6, 2020 at 2:37 AM Anna Povzner  wrote:
> >
> > > Hi David,
> > >
> > > The KIP looks good to me. I am going to the voting thread...
> > >
> > > Hi Jun,
> > >
> > > Yes, exactly. That's a separate thing from this KIP, so working on the
> > fix.
> > >
> > > Thanks,
> > > Anna
> > >
> > > On Fri, Jun 5, 2020 at 4:36 PM Jun Rao  wrote:
> > >
> > > > Hi, Anna,
> > > >
> > > > Thanks for the comment. For the problem that you described, perhaps
> we
> > > need
> > > > to make the quota checking and recording more atomic?
> > > >
> > > > Hi, David,
> > > >
> > > > Thanks for the updated KIP.  Looks good to me now. Just one minor
> > comment
> > > > below.
> > > >
> > > > 30. controller_mutations_rate: For topic creation and deletion, is
> the
> > > rate
> > > > accumulated at the topic or partition level? It would be useful to
> make
> > > it
> > > > clear in the wiki.
> > > >
> > > > Jun
> > > >
> > > > On Fri, Jun 5, 2020 at 7:23 AM David Jacot 
> > wrote:
> > > >
> > > > > Hi Anna and Jun,
> > > > >
> > > > > You are right. We should allocate up to the quota for each old
> > sample.
> > > > >
> > > > > I have revamped the Throttling Algorithm section to better explain
> > our
> > > > > thought process and the token bucket inspiration.
> > > > >
> > > > > I have also added a chapter with few guidelines about how to define
> > > > > the quota. There is no magic formula for this but I give few
> > insights.
> > > > > I don't have specific numbers that can be used out of the box so I
> > > > > think that it is better to not put any for the time being. We can
> > > always
> > > > > complement later on in the documentation.
> > > > >
> > > > > Please, take a look and let me know what you think.
> > > > >
> > > > > Cheers,
> > > > > David
> > > > >
> > > > > On Fri, Jun 5, 2020 at 8:37 AM Anna Povzner 
> > wrote:
> > > > >
> > > > > > Hi David and Jun,
> > > > > >
> > > > > > I dug a bit deeper into the Rate implementation, and wanted to
> > > confirm
> > > > > that
> > > > > > I do believe that the token bucket behavior is better for the
> > reasons
> > > > we
> > > > > > already discussed but wanted to summarize. The main difference
> > > between
> > > > > Rate
> > > > > > and token bucket is that the Rate implementation allows a burst
> by
> > > > > > borrowing from the future, whereas a token bucket allows a burst
> by
> > > > using
> > > > > > accumulated tokens from the previous idle period. Using
> accumulated
> > > > > tokens
> > > > > > smoothes out the rate measurement in general. Configuring a large
> > > burst
> > > > > > requires configuring a large quota window, which causes long
> delays
> > > for
> > > > > > bursty workload, due to borrowing credits from the future.
> Perhaps
> > it
> > > > is
> > > > > > useful to add a summary in the beginning of the Throttling
> > Algorithm
> > > > > > section?
> > > > > >
> > > > > > In my previous email, I mentioned the issue we observed with the
> > > > > bandwidth
> > > > > > quota, where a low quota (1MB/s per broker) was limiting
> bandwidth
> > > > > visibly
> > > > > > below the quota. I thought it was strictly the issue with the
> Rate
> > > > > > implementation as well, but I found a root cause to be different
> > but
> > > > > > amplified by the Rate implementation (long throttle delays of
> > > requests
> > > > > in a
> > > > > > burst). I will describe it here for completeness using the
> > following
> > > > > > example:
> > > > > >
> > > > > >-
> > > > > >
> > > > > >Quota = 1MB/s, default window size and number of samples
> > > > > >-
> > > > > >
> > > > > >Suppose there are 6 connections (maximum 6 outstanding
> > requests),
> > > > and
> > > > > >each produce request is 5MB. If all requests arrive in a
> burst,
> > > the
> > > > > > last 4
> > > > > >requests (20MB over 10MB allowed in a window) may get the same
> > > > > throttle
> > > > > >time if they are processed concurrently. We record the rate
> > under
> > > > the
> > > > > > lock,
> > > > > >but then calculate throttle time separately after that. So,
> for
> > > each
> > > > > >request, the observed rate could be 3MB/s, and each request
> gets
> > > > > > throttle
> > > > > >delay = 20 seconds (instead

Build failed in Jenkins: kafka-trunk-jdk11 #1553

2020-06-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10063; UnsupportedOperation when querying cleaner metrics after

[github] MINOR: Fix fetch session epoch comment in `FetchRequest.json` (#8802)


--
[...truncated 1.89 MB...]

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testProduceConsumeWithWildcardAcls STARTED

kafka.api.TransactionsTest > testFencingOnSendOffsets PASSED

kafka.api.TransactionsTest > testFencingOnAddPartitions STARTED

kafka.api.TransactionsTest > testFencingOnAddPartitions PASSED

kafka.api.TransactionsTest > testFencingOnTransactionExpiration STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testProduceConsumeWithWildcardAcls PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign STARTED

kafka.api.TransactionsTest > testFencingOnTransactionExpiration PASSED

kafka.api.TransactionsTest > testDelayedFetchIncludesAbortedTransaction STARTED

kafka.api.TransactionsTest > testDelayedFetchIncludesAbortedTransaction PASSED

kafka.api.TransactionsTest > testOffsetMetadataInSendOffsetsToTransaction 
STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoGroupAcl STARTED

kafka.api.TransactionsTest > testOffsetMetadataInSendOffsetsToTransaction PASSED

kafka.api.TransactionsTest > testConsecutivelyRunInitTransactions STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoGroupAcl PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl 
STARTED

kafka.api.TransactionsTest > testConsecutivelyRunInitTransactions PASSED

kafka.api.TransactionsTest > testReadCommittedConsumerShouldNotSeeUndecidedData 
STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl 
PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED

kafka.api.TransactionsTest > testReadCommittedConsumerShouldNotSeeUndecidedData 
PASSED

kafka.api.TransactionsTest > testFencingOnSend STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testProduceConsumeViaSubscribe STARTED

kafka.api.TransactionsTest > testFencingOnSend PASSED

kafka.api.TransactionsTest > testFencingOnCommit STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testProduceConsumeViaSubscribe PASSED

kafka.api.SaslSslAdminIntegrationTest > testAclDescribe STARTED

kafka.api.TransactionsTest > testFencingOnCommit PASSED

kafka.api.TransactionsTest > testMultipleMarkersOneLeader STARTED

kafka.api.TransactionsTest > testMultipleMarkersOneLeader PASSED

kafka.api.TransactionsTest > testCommitTransactionTimeout STARTED

kafka.api.TransactionsTest > testCommitTransactionTimeout PASSED

kafka.api.UserClientIdQuotaTest > testProducerConsumerOverrideLowerQuota STARTED

kafka.api.SaslSslAdminIntegrationTest > testAclDescribe PASSED

kafka.api.SaslSslAdminIntegrationTest > 
testLegacyAclOpsNeverAffectOrReturnPrefixed STARTED

kafka.api.UserClientIdQuotaTest > testProducerConsumerOverrideLowerQuota PASSED

kafka.api.UserClientIdQuotaTest > testProducerConsumerOverrideUnthrottled 
STARTED

kafka.api.UserClientIdQuotaTest > testProducerConsumerOverrideUnthrottled PASSED

kafka.api.UserClientIdQuotaTest > testThrottledProducerConsumer STARTED

kafka.api.UserClientIdQuotaTest > testThrottledProducerConsumer PASSED

kafka.api.UserClientIdQuotaTest > testQuotaOverrideDelete STARTED

kafka.api.SaslSslAdminIntegrationTest > 
testLegacyAclOpsNeverAffectOrReturnPrefixed PASSED

kafka.api.SaslSslAdminIntegrationTest > 
testCreateTopicsResponseMetadataAndConfig STARTED

kafka.api.SaslSslAdminIntegrationTest > 
testCreateTopicsResponseMetadataAndConfig PASSED

kafka.api.SaslSslAdminIntegrationTest > testAttemptToCreateInvalidAcls STARTED

kafka.api.UserClientIdQuotaTest > testQuotaOverrideDelete PASSED

kafka.api.UserClientIdQuotaTest > testThrottledRequest STARTED

kafka.api.UserClientIdQuotaTest > testThrottledRequest PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.api.SaslSslAdminIntegrationTest > testAttemptToCreateInvalidAcls PASSED

kafka.api.SaslSslAdminIntegrationTest > testAclAuthorizationDenied STARTED

kafka.api.SaslSslAdminIntegrationTest > testAclAuthorizationDenied PASSED

kafka.api.SaslSslAdminIntegrationTest > testAclOperations STARTED

kafka.api.SaslSslAdminIntegrationTest > testAclOperations PASSED

kafka.api.SaslSslAdminIntegrationTest > testAclOperations2 STARTED

kafka.ap

[jira] [Created] (KAFKA-10128) MM2 - Delete topics when config sync is enabled

2020-06-09 Thread Karthik (Jira)
Karthik created KAFKA-10128:
---

 Summary: MM2 - Delete topics when config sync is enabled
 Key: KAFKA-10128
 URL: https://issues.apache.org/jira/browse/KAFKA-10128
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect, mirrormaker
Affects Versions: 2.5.0
Reporter: Karthik


Topics being deleted on one region is not being deleted and with that its being 
recreated incase of a active-active deployment

 

Logs:

*Test Check delete Topic*

 

*Delete Command*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete 
--topic im115

 

*Source cluster - Before*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list

__consumer_offsets

heartbeats

im115

im32

mm2-configs.us-east.internal

mm2-offset-syncs.us-east.internal

mm2-offsets.us-east.internal

mm2-status.us-east.internal

us-east.checkpoints.internal

us-east.heartbeats

us-east.im115

us-east.us-east-offsets

us-west-offsets

 

*Source cluster -* *After delete*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list

__consumer_offsets

heartbeats

im32

mm2-configs.us-east.internal

mm2-offset-syncs.us-east.internal

mm2-offsets.us-east.internal

mm2-status.us-east.internal

us-east.checkpoints.internal

us-east.heartbeats

us-east.im115

us-east.us-east-offsets

us-west-offsets

 

*Dest Cluster - Before*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list 
    __consumer_offsets

heartbeats

im115

mm2-configs.us-west.internal

mm2-offset-syncs.us-west.internal

mm2-offsets.us-west.internal

mm2-status.us-west.internal

us-east-offsets

us-west.checkpoints.internal

us-west.heartbeats

us-west.im115

us-west.im32

us-west.us-west-offsets

 ** 

*Dest Cluster -* *After Delete*

*Did not delete*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list 
__consumer_offsets
heartbeats
im115
mm2-configs.us-west.internal
mm2-offset-syncs.us-west.internal
mm2-offsets.us-west.internal
mm2-status.us-west.internal
us-east-offsets
us-west.checkpoints.internal
us-west.heartbeats
us-west.im115
us-west.im32
us-west.us-west-offsets

 

With that after the config refresh mins, the deleted topic is being replicated 
back on the source cluster

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
__consumer_offsets
heartbeats
im115
im32
mm2-configs.us-east.internal
mm2-offset-syncs.us-east.internal
mm2-offsets.us-east.internal
mm2-status.us-east.internal
us-east.checkpoints.internal
us-east.heartbeats
us-east.im115
us-east.us-east-offsets
us-west-offsets



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-06-09 Thread Satish Duggana
We did not want to add many implementation details in the KIP. But we
decided to add them in the KIP as appendix or sub-sections(including
follower fetch protocol) to describe the flow with the main cases.
That will answer most of the queries. I will update on this mail
thread when the respective sections are updated.

Thanks,
Satish.

On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez
 wrote:
>
> Hi Satish,
>
> A couple of questions specific to the section "Follower
> Requests/Replication", pages 16:17 in the design document [1].
>
> 900. It is mentioned that followers fetch auxiliary states from the
> remote storage.
>
> 900.a Does the consistency model of the external storage impacts reads
> of leader epochs and other auxiliary data?
>
> 900.b What are the benefits of using a mechanism to store and access
> the leader epochs which is different from other metadata associated to
> tiered segments? What are the benefits of retrieving this information
> on-demand from the follower rather than relying on propagation via the
> topic __remote_log_metadata? What are the advantages over using a
> dedicated control structure (e.g. a new record type) propagated via
> this topic? Since in the document, different control paths are
> operating in the system, how are the metadata stored in
> __remote_log_metadata [which also include the epoch of the leader
> which offloaded a segment] and the remote auxiliary states, kept in
> sync?
>
> 900.c A follower can encounter an OFFSET_MOVED_TO_TIERED_STORAGE. Is
> this in response to a Fetch or OffsetForLeaderEpoch request?
>
> 900.d What happens if, after a follower encountered an
> OFFSET_MOVED_TO_TIERED_STORAGE response, its attempts to retrieve
> leader epochs fail (for instance, because the remote storage is
> temporarily unavailable)? Does the follower fallbacks to a mode where
> it ignores tiered segments, and applies truncation using only locally
> available information? What happens when access to the remote storage
> is restored? How is the replica lineage inferred by the remote leader
> epochs reconciled with the follower's replica lineage, which has
> evolved? Does the follower remember fetching auxiliary states failed
> in the past and attempt reconciliation? Is there a plan to offer
> different strategies in this scenario, configurable via configuration?
>
> 900.e Is the leader epoch cache offloaded with every segment? Or when
> a new checkpoint is detected? If that information is not always
> offloaded to avoid duplicating data, how does the remote storage
> satisfy the request to retrieve it?
>
> 900.f Since the leader epoch cache covers the entire replica lineage,
> what happens if, after a leader epoch cache file is offloaded with a
> given segment, the local epoch cache is truncated [not necessarily for
> a range of offset included in tiered segments]? How are remote and
> local leader epoch caches kept consistent?
>
> 900.g Consumer can also use leader epochs (e.g. to enable fencing to
> protect against stale leaders). What differences would there be
> between consumer and follower fetches? Especially, would consumers
> also fetch leader epoch information from the remote storage?
>
> 900.h Assume a newly elected leader of a topic-partition detects more
> recent segments are available in the external storage, with epochs >
> its local epoch. Does it ignore these segments and their associated
> epoch-to-offset vectors? Or try to reconstruct its local replica
> lineage based on the data remotely available?
>
> Thanks,
> Alexandre
>
> [1] 
> https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing
>
> Le jeu. 4 juin 2020 à 19:55, Satish Duggana  a 
> écrit :
> >
> > Hi Jun,
> > Please let us know if you have any comments on "transactional support"
> > and "follower requests/replication" mentioned in the wiki.
> >
> > Thanks,
> > Satish.
> >
> > On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana  
> > wrote:
> > >
> > > Thanks Jun for your comments.
> > >
> > > >100. It would be useful to provide more details on how those apis are 
> > > >used. Otherwise, it's kind of hard to really assess whether the new apis 
> > > >are sufficient/redundant. A few examples below.
> > >
> > > We will update the wiki and let you know.
> > >
> > > >100.1 deleteRecords seems to only advance the logStartOffset in Log. How 
> > > >does that trigger the deletion of remote log segments?
> > >
> > > RLMTask for leader partition periodically checks whether there are
> > > remote log segments earlier to logStartOffset and the respective
> > > remote log segment metadata and data are deleted by using RLMM and
> > > RSM.
> > >
> > > >100.2 stopReplica with deletion is used in 2 cases (a) replica 
> > > >reassignment; (b) topic deletion. We only want to delete the tiered 
> > > >metadata in the second case. Also, in the second case, who initiates the 
> > > >deletion of the remote segment since the leader may not exist?
> > >
> > > Right, it is deleted only

[jira] [Created] (KAFKA-10127) kafka cluster not recovering - Shrinking ISR continously

2020-06-09 Thread Youssef BOUZAIENNE (Jira)
Youssef BOUZAIENNE created KAFKA-10127:
--

 Summary: kafka cluster not recovering - Shrinking ISR  continously
 Key: KAFKA-10127
 URL: https://issues.apache.org/jira/browse/KAFKA-10127
 Project: Kafka
  Issue Type: Bug
  Components: replication, zkclient
Affects Versions: 2.4.1
 Environment: using kafka version 2.4.1 and zookeeper version 3.5.7
Reporter: Youssef BOUZAIENNE


We are actually facing issue from time to time where our kafka cluster goes 
into a weird state where we see the following log repeating

 

[2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 
620 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR from 
1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 3222741893). 
Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). 
(kafka.cluster.Partition)

 

 

Before that our zookeeper session expired which lead us to that state 

 

after we increased this two value we encounter the issue less frequently but it 
still appears from time to time and the only solution is restart of kafka 
service on all brokers

zookeeper.session.timeout.ms=18000

replica.lag.time.max.ms=3

 

Any help on that please

 

  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread jiamei xie (Jira)
jiamei xie created KAFKA-10126:
--

 Summary: Remove unused options in ConsumerPerformance
 Key: KAFKA-10126
 URL: https://issues.apache.org/jira/browse/KAFKA-10126
 Project: Kafka
  Issue Type: Bug
Reporter: jiamei xie
Assignee: jiamei xie


Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. It's 
a waste of time to test performance vs threads number. So removing it is needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Broker side round robin on topic partitions when receiving messages

2020-06-09 Thread Vinicius Scheidegger
Anyone?

On Fri, Jun 5, 2020 at 2:42 PM Vinicius Scheidegger <
vinicius.scheideg...@gmail.com> wrote:

> Does anyone know how could I perform a load balance to distribute equally
> the messages to all consumers within the same consumer group having
> multiple producers?
>
> Is this a conceptual flaw on Kafka, wasn't it thought for equal
> distribution with multiple producers or am I missing something?
> I've asked on Stack Overflow, on Kafka users mailing group, here (on Kafka
> Devs) and on Slack - and still have no definitive answer (actually most of
> the time I got no answer at all)
>
> Would something like this even be possible in the way Kafka is currently
> designed?
> How does proposing for a KIP work?
>
> Thanks,
>
>
>
> On Thu, May 28, 2020, 3:44 PM Vinicius Scheidegger <
> vinicius.scheideg...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm trying to understand a little bit more about how Kafka works.
>> I have a design with multiple producers writing to a single topic and
>> multiple consumers in a single Consumer Group consuming message from this
>> topic.
>>
>> My idea is to distribute the messages from all producers equally. From
>> reading the documentation I understood that the partition is always
>> selected by the producer. Is that correct?
>>
>> I'd also like to know if there is an out of the box option to assign the
>> partition via a round robin *on the broker side *to guarantee equal
>> distribution of the load - if possible to each consumer, but if not
>> possible, at least to each partition.
>>
>> If my understanding is correct, it looks like in a multiple producer
>> scenario there is lack of support from Kafka regarding load balancing and
>> customers have to either stick to the hash of the key (random distribution,
>> although it would guarantee same key goes to the same partition) or they
>> have to create their own logic on the producer side (i.e. by sharing memory)
>>
>> Am I missing something?
>>
>> Thank you,
>>
>> Vinicius Scheidegger
>>
>