[jira] [Commented] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest
[ https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286901#comment-17286901 ] Haruki Okada commented on KAFKA-12261: -- Yeah, actually I've reconsidered about that and changed the description of this issue. Agree with keeping the default setting. Then may I submit a patch about refining AUTO_OFFSET_RESET_DOC? > Splitting partition causes message loss for consumers with > auto.offset.reset=latest > --- > > Key: KAFKA-12261 > URL: https://issues.apache.org/jira/browse/KAFKA-12261 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: Haruki Okada >Assignee: Luke Chen >Priority: Minor > > As of now, auto.offset.reset of ConsumerConfig is "latest" by default. > > This could be a pitfall that causes message delivery loss when we split > topic's partitions like below: > Say we have a topic-X which have only 1 partition. > # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X > --partitions 2 (topic-X-1 is added) > # producer knows that new partitions are added by refreshing metadata. > starts to produce to topic-X-1 > # bit later, consumer knows that new partitions are added and triggering > consumer rebalance, then starts consuming topic-X-1 > * > ** upon starting consumption, it resets its offset to log-end-offset > If the producer sent several records before 3, they could be not-delivered to > the consumer. > > > This behavior isn't preferable in most cases, so it should be documented in > AUTO_OFFSET_RESET_DOC at least. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest
[ https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286892#comment-17286892 ] Luke Chen commented on KAFKA-12261: --- I think we should document it, but not change the default setting to the *earlist* > Splitting partition causes message loss for consumers with > auto.offset.reset=latest > --- > > Key: KAFKA-12261 > URL: https://issues.apache.org/jira/browse/KAFKA-12261 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: Haruki Okada >Assignee: Luke Chen >Priority: Minor > > As of now, auto.offset.reset of ConsumerConfig is "latest" by default. > > This could be a pitfall that causes message delivery loss when we split > topic's partitions like below: > Say we have a topic-X which have only 1 partition. > # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X > --partitions 2 (topic-X-1 is added) > # producer knows that new partitions are added by refreshing metadata. > starts to produce to topic-X-1 > # bit later, consumer knows that new partitions are added and triggering > consumer rebalance, then starts consuming topic-X-1 > * > ** upon starting consumption, it resets its offset to log-end-offset > If the producer sent several records before 3, they could be not-delivered to > the consumer. > > > This behavior isn't preferable in most cases, so it should be documented in > AUTO_OFFSET_RESET_DOC at least. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12346) punctuate is called at twice the duration passed as the first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME)
Arindam Ray created KAFKA-12346: --- Summary: punctuate is called at twice the duration passed as the first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME) Key: KAFKA-12346 URL: https://issues.apache.org/jira/browse/KAFKA-12346 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.7.0 Reporter: Arindam Ray A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed {code:java} .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() { override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] {override def init(context: ProcessorContext): Unit = { val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]] context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new Punctuator { override def punctuate(timestamp: Long): Unit = { logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}") } } ) }override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = { // no need to return anything here, the Punctuator will emit the records when necessary null }override def close(): Unit = {} } }, /** * register that this Transformer needs to be connected to our state store. */ stateStoreName ) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest
[ https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haruki Okada updated KAFKA-12261: - Description: As of now, auto.offset.reset of ConsumerConfig is "latest" by default. This could be a pitfall that causes message delivery loss when we split topic's partitions like below: Say we have a topic-X which have only 1 partition. # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X --partitions 2 (topic-X-1 is added) # producer knows that new partitions are added by refreshing metadata. starts to produce to topic-X-1 # bit later, consumer knows that new partitions are added and triggering consumer rebalance, then starts consuming topic-X-1 * ** upon starting consumption, it resets its offset to log-end-offset If the producer sent several records before 3, they could be not-delivered to the consumer. This behavior isn't preferable in most cases, so it should be documented in AUTO_OFFSET_RESET_DOC at least. was: As of now, auto.offset.reset of ConsumerConfig is "latest" by default. This could be a pitfall that causes message delivery loss when we split topic's partitions like below: Say we have a topic-X which have only 1 partition. # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X --partitions 2 (topic-X-1 is added) # producer knows that new partitions are added by refreshing metadata. starts to produce to topic-X-1 # bit later, consumer knows that new partitions are added and triggering consumer rebalance, then starts consuming topic-X-1 * ** upon starting consumption, it resets its offset to log-end-offset If the producer sent several records before 3, they could be not-delivered to the consumer. This behavior isn't preferable in most cases, so auto.offset.reset should be set to "earliest" by default to avoid this pitfall. > Splitting partition causes message loss for consumers with > auto.offset.reset=latest > --- > > Key: KAFKA-12261 > URL: https://issues.apache.org/jira/browse/KAFKA-12261 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: Haruki Okada >Assignee: Luke Chen >Priority: Minor > > As of now, auto.offset.reset of ConsumerConfig is "latest" by default. > > This could be a pitfall that causes message delivery loss when we split > topic's partitions like below: > Say we have a topic-X which have only 1 partition. > # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X > --partitions 2 (topic-X-1 is added) > # producer knows that new partitions are added by refreshing metadata. > starts to produce to topic-X-1 > # bit later, consumer knows that new partitions are added and triggering > consumer rebalance, then starts consuming topic-X-1 > * > ** upon starting consumption, it resets its offset to log-end-offset > If the producer sent several records before 3, they could be not-delivered to > the consumer. > > > This behavior isn't preferable in most cases, so it should be documented in > AUTO_OFFSET_RESET_DOC at least. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
chia7712 commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578965629 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -366,6 +346,42 @@ private void readToLogEnd() { } } +// Visible for testing +Map readEndOffsets(Set assignment) { +log.trace("Reading to end of offset log"); + +Map endOffsets; Review comment: unused variable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest
[ https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-12261: - Assignee: Luke Chen > Splitting partition causes message loss for consumers with > auto.offset.reset=latest > --- > > Key: KAFKA-12261 > URL: https://issues.apache.org/jira/browse/KAFKA-12261 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: Haruki Okada >Assignee: Luke Chen >Priority: Minor > > As of now, auto.offset.reset of ConsumerConfig is "latest" by default. > > This could be a pitfall that causes message delivery loss when we split > topic's partitions like below: > Say we have a topic-X which have only 1 partition. > # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X > --partitions 2 (topic-X-1 is added) > # producer knows that new partitions are added by refreshing metadata. > starts to produce to topic-X-1 > # bit later, consumer knows that new partitions are added and triggering > consumer rebalance, then starts consuming topic-X-1 > * > ** upon starting consumption, it resets its offset to log-end-offset > If the producer sent several records before 3, they could be not-delivered to > the consumer. > > > This behavior isn't preferable in most cases, so auto.offset.reset should be > set to "earliest" by default to avoid this pitfall. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12273) InterBrokerSendThread#pollOnce throws FatalExitError even though it is shutdown correctly
[ https://issues.apache.org/jira/browse/KAFKA-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-12273: --- Fix Version/s: 3.0.0 > InterBrokerSendThread#pollOnce throws FatalExitError even though it is > shutdown correctly > - > > Key: KAFKA-12273 > URL: https://issues.apache.org/jira/browse/KAFKA-12273 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 3.0.0, 2.8.0 > > > kafka tests sometimes shutdown gradle with non-zero code. The (one of) root > cause is that InterBrokerSendThread#pollOnce encounters DisconnectException > when NetworkClient is closing. DisconnectException should be viewed as > "expected" error as we do close it. In other words, > InterBrokerSendThread#pollOnce should swallow it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter
[ https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GeordieMai reassigned KAFKA-12336: -- Assignee: GeordieMai > custom stream naming does not work while calling stream[K, V](topicPattern: > Pattern) API with named Consumed parameter > --- > > Key: KAFKA-12336 > URL: https://issues.apache.org/jira/browse/KAFKA-12336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Ramil Israfilov >Assignee: GeordieMai >Priority: Minor > Labels: easy-fix, newbie > > In our Scala application I am trying to implement custom naming for Kafka > Streams application nodes. > We are using topicPattern for our stream source. > Here is an API which I am calling: > > {code:java} > val topicsPattern="t-[A-Za-z0-9-].suffix" > val operations: KStream[MyKey, MyValue] = > builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))( > Consumed.`with`[MyKey, MyValue].withName("my-fancy-name") > ) > {code} > Despite the fact that I am providing Consumed with custom name the topology > describe still show "KSTREAM-SOURCE-00" as name for our stream source. > It is not a problem if I just use a name for topic. But our application needs > to get messages from set of topics based on topicname pattern matching. > After checking the kakfa code I see that > org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line > 103) has a bug: > {code:java} > public KStream stream(final Pattern topicPattern, >final ConsumedInternal consumed) { > final String name = newProcessorName(KStreamImpl.SOURCE_NAME); > final StreamSourceNode streamPatternSourceNode = new > StreamSourceNode<>(name, topicPattern, consumed); > {code} > node name construction does not take into account the name of consumed > parameter. > For example code for another stream api call with topic name does it > correctly: > {code:java} > final String name = new > NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, > KStreamImpl.SOURCE_NAME); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tinawenqiao commented on pull request #9235: KAFKA-10449: Add some important parameter desc in connect-distributed.properties
tinawenqiao commented on pull request #9235: URL: https://github.com/apache/kafka/pull/9235#issuecomment-781848357 > Thanks for the PR @tinawenqiao. > > It would be nice to also mention `rest.port` and `rest.host.name` are deprecated in their descriptions. Could you add something like: > `DEPRECATED: only used when listeners is not set. Use listeners instead. ` to their descriptions in `WorkerConfigs`? > > Thanks Thanks for your notice. A new patch is ready. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out
mjsax commented on pull request #10072: URL: https://github.com/apache/kafka/pull/10072#issuecomment-781840391 Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out
mjsax commented on a change in pull request #10072: URL: https://github.com/apache/kafka/pull/10072#discussion_r578937359 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -32,6 +33,7 @@ public final int topicGroupId; /** The ID of the partition. */ public final int partition; +public Task task; Review comment: Passing the task into the `RecordCollector` also introduced a cyclic dependency, as we pass the collector into the task. But getting the changelog partitions within `handleCorrupted` makes sense -- it's actually even cleaner to begin with. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions
hachikuji commented on pull request #10157: URL: https://github.com/apache/kafka/pull/10157#issuecomment-781825379 > Okay. I guess we need another KIP that explains how to upgrade a cluster and to determine when it is safe to enable the RaftClient on all of the brokers of a cluster. Yes, a KIP describing the upgrade process will be necessary. I think api version negotiation is probably the least of our concerns 😆 . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch commented on pull request #10158: URL: https://github.com/apache/kafka/pull/10158#issuecomment-781813200 @chia7712, for what it's worth, I still think your #10152 PR is still an important fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…
rhauch commented on pull request #10152: URL: https://github.com/apache/kafka/pull/10152#issuecomment-781812825 Thanks, @chia7712. I've reviewed much of this, and it seems straightforward, though I plan to review more thoroughly tomorrow (~12 hours). I've asked a few others that might be more familiar with the admin client code to also take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578916881 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -390,7 +410,11 @@ public void run() { log.trace("Finished read to end log for topic {}", topic); } catch (TimeoutException e) { log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " + -"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); + "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); +continue; +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { Review comment: The previous `readToLogEnd()` that used the consumer did not have any special exception handling in the `start()` method. Any problem to start seems like it should propagate up to result in the worker failing. Especially when coupled with your fix in #10152. I added this here because retriable exceptions should not stop the KafkaBasedLog's thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578919850 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -366,6 +348,44 @@ private void readToLogEnd() { } } +// Visible for testing +Map readEndOffsets(Set assignment) { +log.trace("Reading to end of offset log"); + +Map endOffsets; +// Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. +// That is because it's possible that the consumer is already blocked waiting for new records to appear, when +// the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least +// one more record becomes available, meaning we can't even check whether we're at the end offset. +// Since all we're trying to do here is get the end offset, we should use the supplied admin client +// (if available) +// (which prevents 'consumer.endOffsets(...)' +// from + +// Deprecated constructors do not provide an admin supplier, so the admin is potentially null. +if (useAdminForListOffsets) { +// Use the admin client to immediately find the end offsets for the assigned topic partitions. +// Unlike using the consumer +try { +endOffsets = admin.endOffsets(assignment); Review comment: I did see a way of reducing the # of returns to 2, and removing one of the duplicated lines. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch commented on pull request #10158: URL: https://github.com/apache/kafka/pull/10158#issuecomment-781810306 Thanks for the quick review, @chia7712. I've incorporated several of your suggestions, and added comments/responses on the others. BTW, a run of the relevant Connect distributed system tests passed with this branch: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4390/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578917177 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -366,6 +348,44 @@ private void readToLogEnd() { } } +// Visible for testing +Map readEndOffsets(Set assignment) { +log.trace("Reading to end of offset log"); + +Map endOffsets; +// Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. +// That is because it's possible that the consumer is already blocked waiting for new records to appear, when +// the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least +// one more record becomes available, meaning we can't even check whether we're at the end offset. +// Since all we're trying to do here is get the end offset, we should use the supplied admin client +// (if available) +// (which prevents 'consumer.endOffsets(...)' +// from + +// Deprecated constructors do not provide an admin supplier, so the admin is potentially null. +if (useAdminForListOffsets) { +// Use the admin client to immediately find the end offsets for the assigned topic partitions. +// Unlike using the consumer +try { +endOffsets = admin.endOffsets(assignment); Review comment: I wanted to avoid 3 separate return statements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578916881 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -390,7 +410,11 @@ public void run() { log.trace("Finished read to end log for topic {}", topic); } catch (TimeoutException e) { log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " + -"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); + "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); +continue; +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { Review comment: The previous `readToLogEnd()` that used the consumer did not have any special exception handling in the `start()` method. I added this here because retriable exceptions should not stop the KafkaBasedLog's thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10154: MINOR: Added missing import to kafka.py
chia7712 merged pull request #10154: URL: https://github.com/apache/kafka/pull/10154 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10154: MINOR: Added missing import to kafka.py
chia7712 commented on pull request #10154: URL: https://github.com/apache/kafka/pull/10154#issuecomment-781801914 run `downgrade_test` with this patch on my local. pass will merge this patch to trunk and 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10157: MINOR: Raft request thread should discover api versions
hachikuji commented on pull request #10157: URL: https://github.com/apache/kafka/pull/10157#issuecomment-781799540 @jsancio I may be misunderstanding your question, but api versions are negotiated internally in `NetworkClient`. If there are no compatible versions, then the client will raise an unsupported version error. It looks like it would be useful to have at least one test case to verify the expected behavior. At a glance, I could not find any logic to check the returned `ClientResponse` for version errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
chia7712 commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578907382 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -366,6 +348,44 @@ private void readToLogEnd() { } } +// Visible for testing +Map readEndOffsets(Set assignment) { +log.trace("Reading to end of offset log"); + +Map endOffsets; +// Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. +// That is because it's possible that the consumer is already blocked waiting for new records to appear, when +// the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least +// one more record becomes available, meaning we can't even check whether we're at the end offset. +// Since all we're trying to do here is get the end offset, we should use the supplied admin client +// (if available) +// (which prevents 'consumer.endOffsets(...)' +// from + +// Deprecated constructors do not provide an admin supplier, so the admin is potentially null. +if (useAdminForListOffsets) { +// Use the admin client to immediately find the end offsets for the assigned topic partitions. +// Unlike using the consumer +try { +endOffsets = admin.endOffsets(assignment); Review comment: How about `return admin.endOffsets(assignment);` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -390,7 +410,11 @@ public void run() { log.trace("Finished read to end log for topic {}", topic); } catch (TimeoutException e) { log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " + -"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); + "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); +continue; +} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { Review comment: `readToLogEnd` is called by `start`. Should we add similar exception handle for that? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -321,29 +325,7 @@ private void readToLogEnd() { log.trace("Reading to end of offset log"); Review comment: redundant log message. `readEndOffsets(Set)` has similar log. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ## @@ -651,6 +651,10 @@ public Config describeTopicConfig(String topic) { * @param partitions the topic partitions * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the admin client cannot read end offsets + * @throws TimeoutException if the offset metadata could not be fetched before the amount of time allocated + * by {@code request.timeout.ms} expires, and this call can be retried + * @throws LeaderNotAvailableException if the leader was not available and this call can be retried * @throws RetriableException if a retriable error occurs, the operation takes too long, or the Review comment: `the operation takes too long` ^^^ this message gets invalid since `TimeoutException` is not wrapped to `RetriableException` anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10147: MINOR: Add raft resigned state metric name
dengziming commented on pull request #10147: URL: https://github.com/apache/kafka/pull/10147#issuecomment-781799200 @guozhangwang @hachikuji , Hello, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12258) Change the BatchAccumulator to split records into batches
[ https://issues.apache.org/jira/browse/KAFKA-12258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12258. - Resolution: Fixed > Change the BatchAccumulator to split records into batches > - > > Key: KAFKA-12258 > URL: https://issues.apache.org/jira/browse/KAFKA-12258 > Project: Kafka > Issue Type: Sub-task > Components: controller, core >Affects Versions: 2.8.0 >Reporter: Alok Nikhil >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > > Modify the `BatchAccumulator.append contract` to support splitting a batch of > records whose size is greater than the maximum allowed size (of 1048576 > currently) into batches to avoid RaftClient failures such as > {code:java} > leader=0, leaderEpoch=0, partitionEpoch=0) at version 0), > ApiMessageAndVersion(TopicRecord(name='topic-BEHRW', > topicId=6cRudOGO3yqlsu48RwyPSw) at version 0), > ApiMessageAndVersion(PartitionRecord(partitionId=0, > topicId=6cRudOGO3yqlsu48RwyPSw, replicas=[1, 2, 0], isr=[1, 2, 0], > removingReplicas=null, addingReplicas=null, leader=1, leaderEpoch=0, > partitionEpoch=0) at version 0)] is 1088890, which exceeds the maximum > allowed batch size of 1048576 Jan 30 00:13:40 ip-10-0-0-254 > kafka-server-start.sh[633637]: at > org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:110) > Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at > org.apache.kafka.raft.KafkaRaftClient.scheduleAppend(KafkaRaftClient.java:1885) > Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at > org.apache.kafka.raft.metadata.MetaLogRaftShim.scheduleWrite(MetaLogRaftShim.java:60) > Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:406) > Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at > org.apache.kafka.common.utils.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:117) > Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at > org.apache.kafka.common.utils.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:192) > Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at > org.apache.kafka.common.utils.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:165) > Jan 30 00:13:40 ip-10-0-0-254 kafka-server-start.sh[633637]: at > java.base/java.lang.Thread.run(Thread.java:834) Jan 30 00:13:40 ip-10-0-0-254 > kafka-server-start.sh[633637]: [2021-01-30 00:13:40,277] INFO [Controller > 3000] Reverting to snapshot 2232 (org.apache.kafka.timeline.SnapshotRegistry) > {code} > *Example use-case*: Creating 10,000 topics in a single API call -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10063: KAFKA-12258: Add support for splitting appending records
hachikuji merged pull request #10063: URL: https://github.com/apache/kafka/pull/10063 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records
jsancio commented on pull request #10063: URL: https://github.com/apache/kafka/pull/10063#issuecomment-781790693 The following tests failed: ``` core: ConfigCommandTest. shouldFailIfUnresolvableHost() ConfigCommandTest. shouldFailIfUnresolvableHost() ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper() ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper() DynamicConfigChangeTest. testIpHandlerUnresolvableAddress() DynamicConfigChangeTest. testIpHandlerUnresolvableAddress() DynamicConfigTest. shouldFailIpConfigsWithBadHost() DynamicConfigTest. shouldFailIpConfigsWithBadHost() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10157: MINOR: Raft request thread should discover api versions
jsancio commented on pull request #10157: URL: https://github.com/apache/kafka/pull/10157#issuecomment-781784110 > We do not plan to rely on the IBP in order to determine API versions for raft requests. Instead, we want to discover them through the ApiVersions API. This patch enables the flag to do so. How or where do we check that the connected nodes support the versions we require? Is that coming in a future PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #10159: KAFKA-12338: Consolidate MetadataRecordSerde and MetadataParser serial/deserial code
dengziming opened a new pull request #10159: URL: https://github.com/apache/kafka/pull/10159 *More detailed description of your change* The logics are duplicated except that `MetadataRecordSerde` has an extra `DEFAULT_FRAME_VERSION`, if we want to change the serial/deserial format of metadata, we should modify 2 classes, this is unreasonable. *Summary of testing strategy (including rationale)* unit test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records
jsancio commented on pull request #10063: URL: https://github.com/apache/kafka/pull/10063#issuecomment-781778478 Running the following commands ``` ./gradlew -version Gradle 6.8.1 Build time: 2021-01-22 13:20:08 UTC Revision: 31f14a87d93945024ab7a78de84102a3400fa5b2 Kotlin: 1.4.20 Groovy: 2.5.12 Ant: Apache Ant(TM) version 1.10.9 compiled on September 27 2020 JVM: 1.8.0_282 (Private Build 25.282-b08) OS: Linux 5.8.0-7642-generic amd64 ./gradlew -PscalaVersion=2.12 clean compileJava compileScala compileTestJava compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile --no-daemon --continue -PxmlSpotBugsReport=true ./gradlew -PscalaVersion=2.13 clean compileJava compileScala compileTestJava compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile --no-daemon --continue -PxmlSpotBugsReport=true ./gradlew -PscalaVersion=2.12 unitTest integrationTest --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true -PmaxTestRetries=1 -PmaxTestRetryFailures=5 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10091: KAFKA-9524: increase retention time for window and grace periods longer than one day
mjsax commented on pull request #10091: URL: https://github.com/apache/kafka/pull/10091#issuecomment-781777265 Thanks for the PR @MarcoLotz! And congrats to your first code contribution! Thanks for reviewing @vcrfxia! Merged to `trunk` and cherry-picked to `2.8` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9524) Default window retention does not consider grace period
[ https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9524. Fix Version/s: 2.8.0 Resolution: Fixed > Default window retention does not consider grace period > --- > > Key: KAFKA-9524 > URL: https://issues.apache.org/jira/browse/KAFKA-9524 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Assignee: Marco Lotz >Priority: Minor > Fix For: 2.8.0 > > > In a windowed aggregation, if you specify a window size larger than the > default window retention (1 day), Streams will implicitly set retention > accordingly to accommodate windows of that size. For example, > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20))) > {code} > In this case, Streams will implicitly set window retention to 20 days, and no > exceptions will occur. > However, if you also include a non-zero grace period on the window, such as: > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) > {code} > In this case, Streams will still implicitly set the window retention 20 days > (not 20 days + 5 minutes grace), and an exception will be thrown: > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: The retention > period of the window store KSTREAM-KEY-SELECT-02 must be no smaller > than its window size plus the grace period. Got size=[172800], > grace=[30], retention=[172800]{code} > Ideally, Streams should include grace period when implicitly setting window > retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9524) Default window retention does not consider grace period
[ https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9524: --- Issue Type: Improvement (was: Bug) > Default window retention does not consider grace period > --- > > Key: KAFKA-9524 > URL: https://issues.apache.org/jira/browse/KAFKA-9524 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Assignee: Marco Lotz >Priority: Minor > > In a windowed aggregation, if you specify a window size larger than the > default window retention (1 day), Streams will implicitly set retention > accordingly to accommodate windows of that size. For example, > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20))) > {code} > In this case, Streams will implicitly set window retention to 20 days, and no > exceptions will occur. > However, if you also include a non-zero grace period on the window, such as: > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) > {code} > In this case, Streams will still implicitly set the window retention 20 days > (not 20 days + 5 minutes grace), and an exception will be thrown: > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: The retention > period of the window store KSTREAM-KEY-SELECT-02 must be no smaller > than its window size plus the grace period. Got size=[172800], > grace=[30], retention=[172800]{code} > Ideally, Streams should include grace period when implicitly setting window > retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch opened a new pull request #10158: KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException
rhauch opened a new pull request #10158: URL: https://github.com/apache/kafka/pull/10158 Refactored the KafkaBasedLog logic to read end offsets into a separate method to make it easier to test. Also changed the TopicAdmin.endOffsets method to throw the original UnsupportedVersionException, LeaderNotAvailableException, and TimeoutException rather than wrapping, to better conform with the consumer method and how the KafkaBasedLog retries those exceptions. Added new tests to verify various scenarios and errors. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records
jsancio commented on a change in pull request #10063: URL: https://github.com/apache/kafka/pull/10063#discussion_r578886362 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java ## @@ -307,4 +309,38 @@ public int writeRecord( recordOutput.writeVarint(0); return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes; } + +private int batchHeaderSizeInBytes() { +return AbstractRecords.recordBatchHeaderSizeInBytes( +RecordBatch.MAGIC_VALUE_V2, +compressionType +); +} + +private int bytesNeededForRecords( +Collection records, +ObjectSerializationCache serializationCache +) { +long expectedNextOffset = nextOffset; +int bytesNeeded = 0; +for (T record : records) { +if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) { +return Integer.MAX_VALUE; Review comment: Updated it to use `Math.addExact`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alok Nikhil updated KAFKA-12345: Affects Version/s: 2.8.0 > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Affects Versions: 2.8.0 >Reporter: Alok Nikhil >Priority: Major > Labels: kip-500 > > Occasionally, a scheduler thread on a broker crashes with this stack > > {code:java} > [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task > 'send-alter-isr' (kafka.utils.KafkaScheduler) > java.lang.NullPointerException > at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) > at > kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) > at > kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > After that the broker is unable to fetch any records from any other broker > (and vice versa) > {code:java} > [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, > fetcherId=0] Error sending fetch request (sessionId=164432409 > 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) > java.io.IOException: Connection to 4 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286821#comment-17286821 ] Alok Nikhil commented on KAFKA-12345: - Hi [~dengziming]. I have updated the Kafka version and the priority. This is part of the KIP-500 merge. So, it's not an active issue (considering KIP-500 is still in development and not the default mode the Broker will operate in). > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Affects Versions: 2.8.0 >Reporter: Alok Nikhil >Priority: Minor > Labels: kip-500 > > Occasionally, a scheduler thread on a broker crashes with this stack > > {code:java} > [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task > 'send-alter-isr' (kafka.utils.KafkaScheduler) > java.lang.NullPointerException > at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) > at > kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) > at > kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > After that the broker is unable to fetch any records from any other broker > (and vice versa) > {code:java} > [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, > fetcherId=0] Error sending fetch request (sessionId=164432409 > 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) > java.io.IOException: Connection to 4 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alok Nikhil updated KAFKA-12345: Priority: Minor (was: Major) > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Affects Versions: 2.8.0 >Reporter: Alok Nikhil >Priority: Minor > Labels: kip-500 > > Occasionally, a scheduler thread on a broker crashes with this stack > > {code:java} > [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task > 'send-alter-isr' (kafka.utils.KafkaScheduler) > java.lang.NullPointerException > at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) > at > kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) > at > kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > After that the broker is unable to fetch any records from any other broker > (and vice versa) > {code:java} > [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, > fetcherId=0] Error sending fetch request (sessionId=164432409 > 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) > java.io.IOException: Connection to 4 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon edited a comment on pull request #10118: KAFKA-10192: increase starting up waiting time
showuon edited a comment on pull request #10118: URL: https://github.com/apache/kafka/pull/10118#issuecomment-781768574 > Fine by me, flaky tests suck. Indeed! Thanks, @C0urante @kkonstantine , could you check this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling
hachikuji commented on a change in pull request #10129: URL: https://github.com/apache/kafka/pull/10129#discussion_r578873790 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ## @@ -354,7 +355,8 @@ "Requested position is not greater than or equal to zero, and less than the size of the snapshot.", PositionOutOfRangeException::new), UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new), -DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new); +DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new), +INVALID_CLUSTER_ID(102, "The supplied cluster id is not valid.", InvalidClusterIdException::new); Review comment: I couldn't find any existing `INVALID*` error code that seems to fit this case. Usually "invalid" is reserved for cases where the field is structurally invalid. For example, `INVALID_GROUP_ID` is used when the groupid is empty in APIs where we require it to be non-empty. The closest similar case is `INVALID_PRODUCER_ID_MAPPING`. We are going to add an `INCONSISTENT_TOPIC_ID` in https://github.com/apache/kafka/pull/10143. Perhaps that is enough cover here? The usage is similar: the request indicates an id which does not match the local state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10118: KAFKA-10192: increase starting up waiting time
showuon commented on pull request #10118: URL: https://github.com/apache/kafka/pull/10118#issuecomment-781768574 > Fine by me, flaky tests suck. Indeed! Thanks, @C0urante @kkonstantine , could you check this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10157: MINOR: Raft request thread should discover api versions
hachikuji opened a new pull request #10157: URL: https://github.com/apache/kafka/pull/10157 We do not plan to rely on the IBP in order to determine API versions for raft requests. Instead, we want to discover them through the ApiVersions API. This patch enables the flag to do so. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578876029 ## File path: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException; +import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureMapAndEpoch; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + + +public class ClusterControlManager { Review comment: Could we add some comments to this class? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -0,0 +1,900 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectLe
[GitHub] [kafka] mjsax merged pull request #10091: KAFKA-9524: increase retention time for window and grace periods longer than one day
mjsax merged pull request #10091: URL: https://github.com/apache/kafka/pull/10091 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling
hachikuji commented on a change in pull request #10129: URL: https://github.com/apache/kafka/pull/10129#discussion_r578873790 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ## @@ -354,7 +355,8 @@ "Requested position is not greater than or equal to zero, and less than the size of the snapshot.", PositionOutOfRangeException::new), UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new), -DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new); +DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new), +INVALID_CLUSTER_ID(102, "The supplied cluster id is not valid.", InvalidClusterIdException::new); Review comment: I couldn't find any existing `INVALID*` error code that seems to fit this case. Usually "invalid" is reserved for cases where the field is structurally invalid. For example, `INVALID_GROUP_ID` is used when the groupid is empty in APIs where we require it to be non-empty. The closest similar case is `INVALID_PRODUCER_ID_MAPPING`. We are going to add an INCONSISTENT_TOPIC_ID in https://github.com/apache/kafka/pull/10143. Perhaps that is enough cover here? The usage is similar: the request indicates an id which does not match the local state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling
hachikuji commented on a change in pull request #10129: URL: https://github.com/apache/kafka/pull/10129#discussion_r578876834 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1751,6 +1768,7 @@ private FetchRequestData buildFetchRequest() { }); return request .setMaxWaitMs(fetchMaxWaitMs) +.setClusterId(clusterId.toString()) Review comment: One small detail which is probably ok. The clusterId field in the fetch schema is not currently marked as ignorable. That should be ok since it is only used in the raft implementation which can guarantee that we will have version 12 and above. On the other hand, I don't see any harm making the field ignorable since we are accepting a null value anyway. Is it worth changing that? ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ## @@ -354,7 +355,8 @@ "Requested position is not greater than or equal to zero, and less than the size of the snapshot.", PositionOutOfRangeException::new), UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new), -DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new); +DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new), +INVALID_CLUSTER_ID(102, "The supplied cluster id is not valid.", InvalidClusterIdException::new); Review comment: I couldn't find any existing `INVALID*` error code that seems to fit this case. Usually "invalid" is reserved for cases where the field is structurally invalid. For example, `INVALID_GROUP_ID` is used when the groupid is empty. The closest similar case is `INVALID_PRODUCER_ID_MAPPING`. We are going to add an INCONSISTENT_TOPIC_ID in https://github.com/apache/kafka/pull/10143. Perhaps that is enough cover here? The usage is similar: the request indicates an id which does not match the local state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
dengziming commented on pull request #10021: URL: https://github.com/apache/kafka/pull/10021#issuecomment-781762837 ping @hachikuji . retest this, please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286815#comment-17286815 ] dengziming edited comment on KAFKA-12345 at 2/19/21, 2:02 AM: -- hello, please provide your specified Kafka version, or the revision number if you are using the dev branch of kafka for convenience. was (Author: dengziming): hello, please provide your specified Kafka version, or the revision number if you are using the dev branch of kafka. > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Alok Nikhil >Priority: Major > Labels: kip-500 > > Occasionally, a scheduler thread on a broker crashes with this stack > > {code:java} > [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task > 'send-alter-isr' (kafka.utils.KafkaScheduler) > java.lang.NullPointerException > at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) > at > kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) > at > kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > After that the broker is unable to fetch any records from any other broker > (and vice versa) > {code:java} > [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, > fetcherId=0] Error sending fetch request (sessionId=164432409 > 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) > java.io.IOException: Connection to 4 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286815#comment-17286815 ] dengziming commented on KAFKA-12345: hello, please provide your specified Kafka version, or the revision number if you are using the dev branch of kafka. > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Alok Nikhil >Priority: Major > Labels: kip-500 > > Occasionally, a scheduler thread on a broker crashes with this stack > > {code:java} > [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task > 'send-alter-isr' (kafka.utils.KafkaScheduler) > java.lang.NullPointerException > at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) > at > kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) > at > kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > After that the broker is unable to fetch any records from any other broker > (and vice versa) > {code:java} > [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, > fetcherId=0] Error sending fetch request (sessionId=164432409 > 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) > java.io.IOException: Connection to 4 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records
hachikuji commented on a change in pull request #10063: URL: https://github.com/apache/kafka/pull/10063#discussion_r578870759 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -298,28 +342,30 @@ public void close() { public final List records; public final MemoryRecords data; private final MemoryPool pool; -private final ByteBuffer buffer; +// Buffer that was allocated by the MemoryPool (pool). This may not be the buffer used in +// the MemoryRecords (data) object. +private final ByteBuffer pooledBuffer; private CompletedBatch( long baseOffset, List records, MemoryRecords data, MemoryPool pool, -ByteBuffer buffer +ByteBuffer pooledBuffer Review comment: nit: might be worth using the same name in `BatchBuilder`. Currently we use `initialBuffer` ## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ## @@ -164,47 +170,84 @@ public void testUnflushedBuffersReleasedByClose() { @Test public void testSingleBatchAccumulation() { -int leaderEpoch = 17; -long baseOffset = 157; -int lingerMs = 50; -int maxBatchSize = 512; - -Mockito.when(memoryPool.tryAllocate(maxBatchSize)) -.thenReturn(ByteBuffer.allocate(maxBatchSize)); - -BatchAccumulator acc = buildAccumulator( -leaderEpoch, -baseOffset, -lingerMs, -maxBatchSize -); - -List records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i"); -assertEquals(baseOffset, acc.append(leaderEpoch, records.subList(0, 1))); -assertEquals(baseOffset + 2, acc.append(leaderEpoch, records.subList(1, 3))); -assertEquals(baseOffset + 5, acc.append(leaderEpoch, records.subList(3, 6))); -assertEquals(baseOffset + 7, acc.append(leaderEpoch, records.subList(6, 8))); -assertEquals(baseOffset + 8, acc.append(leaderEpoch, records.subList(8, 9))); - -time.sleep(lingerMs); -assertTrue(acc.needsDrain(time.milliseconds())); - -List> batches = acc.drain(); -assertEquals(1, batches.size()); -assertFalse(acc.needsDrain(time.milliseconds())); -assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds())); - -BatchAccumulator.CompletedBatch batch = batches.get(0); -assertEquals(records, batch.records); -assertEquals(baseOffset, batch.baseOffset); +asList(APPEND, APPEND_ATOMIC).forEach(appender -> { +int leaderEpoch = 17; +long baseOffset = 157; +int lingerMs = 50; +int maxBatchSize = 512; + +Mockito.when(memoryPool.tryAllocate(maxBatchSize)) +.thenReturn(ByteBuffer.allocate(maxBatchSize)); + +BatchAccumulator acc = buildAccumulator( +leaderEpoch, +baseOffset, +lingerMs, +maxBatchSize +); + +List records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i"); +assertEquals(baseOffset, appender.call(acc, leaderEpoch, records.subList(0, 1))); +assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, records.subList(1, 3))); +assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, records.subList(3, 6))); +assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, records.subList(6, 8))); +assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, records.subList(8, 9))); + +time.sleep(lingerMs); +assertTrue(acc.needsDrain(time.milliseconds())); + +List> batches = acc.drain(); +assertEquals(1, batches.size()); +assertFalse(acc.needsDrain(time.milliseconds())); +assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds())); + +BatchAccumulator.CompletedBatch batch = batches.get(0); +assertEquals(records, batch.records); +assertEquals(baseOffset, batch.baseOffset); +}); } @Test public void testMultipleBatchAccumulation() { +asList(APPEND, APPEND_ATOMIC).forEach(appender -> { +int leaderEpoch = 17; +long baseOffset = 157; +int lingerMs = 50; +int maxBatchSize = 256; + +Mockito.when(memoryPool.tryAllocate(maxBatchSize)) +.thenReturn(ByteBuffer.allocate(maxBatchSize)); + +BatchAccumulator acc = buildAccumulator( +leaderEpoch, +baseOffset, +lingerMs, +maxBatchSize +); + +// Append entries until we have 4 batches to drain (3 completed, 1 building) +
[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records
hachikuji commented on a change in pull request #10063: URL: https://github.com/apache/kafka/pull/10063#discussion_r578870321 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java ## @@ -307,4 +309,38 @@ public int writeRecord( recordOutput.writeVarint(0); return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes; } + +private int batchHeaderSizeInBytes() { +return AbstractRecords.recordBatchHeaderSizeInBytes( +RecordBatch.MAGIC_VALUE_V2, +compressionType +); +} + +private int bytesNeededForRecords( +Collection records, +ObjectSerializationCache serializationCache +) { +long expectedNextOffset = nextOffset; +int bytesNeeded = 0; +for (T record : records) { +if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) { +return Integer.MAX_VALUE; Review comment: Since we are handling this case by raising an exception, is it worth checking for overflow of `bytesNeeded` as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286810#comment-17286810 ] Alok Nikhil commented on KAFKA-12345: - Adding a bit more context. Seems like there was a controller quorum voting event just before this crash. Seems to be related {code:java} [2021-02-19 01:04:20,721] INFO [Controller 3000] Fenced broker: FenceBrokerRecord(id=3, epoch=2) (org.apache.kafka.controller.ClusterControlManager) [2021-02-19 01:04:21,354] INFO [BrokerMetadataListener id=0] Tell Replica Manager to handle records (kafka.server.metadata.BrokerMetadataListener) [2021-02-19 01:04:22,097] INFO [RaftManager broker=0,controller=3000] Completed transition to Unattached(epoch=2, voters=[3000, 3001, 3002, 3003, 3004], electionTimeoutMs=690) (org.apache.kafka.raft.QuorumState) [2021-02-19 01:04:22,203] INFO [RaftManager broker=0,controller=3000] Completed transition to Unattached(epoch=3, voters=[3000, 3001, 3002, 3003, 3004], electionTimeoutMs=584) (org.apache.kafka.raft.QuorumState) [2021-02-19 01:04:22,507] INFO [RaftManager broker=0,controller=3000] Completed transition to Unattached(epoch=4, voters=[3000, 3001, 3002, 3003, 3004], electionTimeoutMs=280) (org.apache.kafka.raft.QuorumState) [2021-02-19 01:04:22,626] INFO [RaftManager broker=0,controller=3000] Completed transition to Unattached(epoch=5, voters=[3000, 3001, 3002, 3003, 3004], electionTimeoutMs=161) (org.apache.kafka.raft.QuorumState) [2021-02-19 01:04:22,626] INFO [RaftManager broker=0,controller=3000] Completed transition to Voted(epoch=5, votedId=3004, voters=[3000, 3001, 3002, 3003, 3004], electionTimeoutMs=520) (org.apache.kafka.raft.QuorumState) [2021-02-19 01:04:22,631] INFO [RaftManager broker=0,controller=3000] Completed transition to FollowerState(fetchTimeoutMs=2000, epoch=5, leaderId=3004, voters=[3000, 3001, 3002, 3003, 3004]) (org.apache.kafka.raft.QuorumState){code} > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Alok Nikhil >Priority: Major > Labels: kip-500 > > Occasionally, a scheduler thread on a broker crashes with this stack > > {code:java} > [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task > 'send-alter-isr' (kafka.utils.KafkaScheduler) > java.lang.NullPointerException > at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) > at > kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) > at > kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > After that the broker is unable to fetch any records from any other broker > (and vice versa) > {code:java} > [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, > fetcherId=0] Error sending fetch request (sessionId=164432409 > 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) > java.io.IOException: Connection to 4 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alok Nikhil updated KAFKA-12345: Description: Occasionally, a scheduler thread on a broker crashes with this stack {code:java} [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 'send-alter-isr' (kafka.utils.KafkaScheduler) java.lang.NullPointerException at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) at kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834){code} After that the broker is unable to fetch any records from any other broker (and vice versa) {code:java} [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, fetcherId=0] Error sending fetch request (sessionId=164432409 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) java.io.IOException: Connection to 4 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} was: Occasionally, a scheduler thread on a broker crashes with this stack ``` [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 'send-alter-isr' (kafka.utils.KafkaScheduler) java.lang.NullPointerException at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) at kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) ``` After that the broker is unable to fetch any records from any other broker (and vice versa) ``` [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, fetcherId=0] Error sending fetch request (sessionId=164432409 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) java.io.IOException: Connection to 4 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) ``` > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Alok Nikhil >Priority: Major > Labels: kip-500 > > Occa
[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join
[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286808#comment-17286808 ] Guozhang Wang commented on KAFKA-10847: --- [~spena][~vcrfxia][~mjsax] I'd like to dump some more thoughts here regarding the design: 1) Regarding `fetchAll`: intuitively, even if we call this function on the newly added bookkeeping stores for those records who have not found a match yet, since the expiration frequency is high, the scan function should only return a few records each time. Within RocksDB, a range scan function is usually executed as 1) find the starting point based on the lower bound, 2) start scanning through the sst tables, stop at the higher bound. Among these two steps, the test spent finding the starting point should be roughly the same as the time spent for a single-key lookup, and since the calling frequency is high, the `low` and `high` bound on timestamp should be pretty close (maybe just a couple milliseconds away), which means step 2) should stop fairly quickly as well. We could see if this is indeed the case or not, e.g. when running the benchmark we first break down the latency to multiple stages: 1) time spent reading / writing in the window store, 2) time spent range-fetching in the expiration store, 3) deleting records in the expiration store, 4) join operator itself. And then within 2), we can plot a chart where the x-axis is the number of records returned, and y-axis is the e2e latency of getting the iterator and looping through it. If it shows that, even with say zero or one record returned, there's still a huge constant cost in latency, we may then consider 2) below. 2) If 1) shows that range-fetch in rocksDB has a constant amortized cost even with very few records returned, then instead of trying to expire records from the expiration store on each input record, we only try to fetch-and-delete from the expiration store periodically. We should then be careful about the period since if it is too infrequent, then we may introduce a much longer output emission latency, plus having the risk of blocking a thread from calling consumer.poll(). 3) Another idea I have in mind is that, for the expiration store, instead of using two physical stores, one for each side, we can consider just adding a single store for both sides, with the key prefixed by the logical stream (e.g. a single bit prefix, "0" for left, and "1" for right). By doing that we can have one physical store less, and since the records from either side is clustered in the underlying layout, the range query should not be impacted much. 4) One more idea I have for the expiration store again: instead of using the sequence id to disable deduplication, we can consider removing that from the key to save 4 bytes per record, and instead serialize the value schema as a list, assuming that in practice most should be a singleton list, which we can serialize with only very small byte overhead. The downside though is that instead of being able to do a blind write to append, we need to call a get and then a write, though the get should return null in most time. I'm hoping that with RocksDB's bloomfilter, such a get should be efficient and worth the tradeoff. > Avoid spurious left/outer join results in stream-stream join > - > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sergio Peña >Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
[ https://issues.apache.org/jira/browse/KAFKA-12345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alok Nikhil updated KAFKA-12345: Description: Occasionally, a scheduler thread on a broker crashes with this stack ``` [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 'send-alter-isr' (kafka.utils.KafkaScheduler) java.lang.NullPointerException at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) at kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) ``` After that the broker is unable to fetch any records from any other broker (and vice versa) ``` [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, fetcherId=0] Error sending fetch request (sessionId=164432409 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) java.io.IOException: Connection to 4 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) ``` was: Occasionally, a scheduler thread on a broker crashes with this stack [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 'send-alter-isr' (kafka.utils.KafkaScheduler) java.lang.NullPointerException at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) at kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) After that the broker is unable to fetch any records from any other broker (and vice versa) [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, fetcherId=0] Error sending fetch request (sessionId=164432409 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) java.io.IOException: Connection to 4 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > KIP-500: AlterIsrManager crashes on broker idle-state > - > > Key: KAFKA-12345 > URL: https://issues.apache.org/jira/browse/KAFKA-12345 > Project: Kafka > Issue Type: Task > Components: core >Report
[jira] [Created] (KAFKA-12345) KIP-500: AlterIsrManager crashes on broker idle-state
Alok Nikhil created KAFKA-12345: --- Summary: KIP-500: AlterIsrManager crashes on broker idle-state Key: KAFKA-12345 URL: https://issues.apache.org/jira/browse/KAFKA-12345 Project: Kafka Issue Type: Task Components: core Reporter: Alok Nikhil Occasionally, a scheduler thread on a broker crashes with this stack [2021-02-19 01:04:24,683] ERROR Uncaught exception in scheduled task 'send-alter-isr' (kafka.utils.KafkaScheduler) java.lang.NullPointerException at kafka.server.AlterIsrManagerImpl.sendRequest(AlterIsrManager.scala:117) at kafka.server.AlterIsrManagerImpl.propagateIsrChanges(AlterIsrManager.scala:85) at kafka.server.AlterIsrManagerImpl.$anonfun$start$1(AlterIsrManager.scala:66) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) After that the broker is unable to fetch any records from any other broker (and vice versa) [2021-02-19 01:05:07,000] INFO [ReplicaFetcher replicaId=0, leaderId=4, fetcherId=0] Error sending fetch request (sessionId=164432409 2, epoch=957) to node 4: (org.apache.kafka.clients.FetchSessionHandler) java.io.IOException: Connection to 4 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:313) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:139) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:138) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on pull request #10070: URL: https://github.com/apache/kafka/pull/10070#issuecomment-781745956 > I think we need to handle preferred leader election in a special way. For example, if the assigned replicas are 1,2,3, isr is 2,3 and the current leader is 3, when doing preferred leader election, we want to keep the leader as 3 instead of changing it to 2. Hmm, wouldn't we want to switch the leader to 2 in that case, since 2 is more preferred? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578857828 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener; +import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.controller.BrokersToIsrs.TopicPartition; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metalog.LocalLogManagerTestEnv; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; +import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0; +import static org.apache.kafka.controller.ConfigurationControlManagerTest.CONFIGS; +import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class QuorumControllerTest { +private static final Logger log = +LoggerFactory.getLogger(QuorumControllerTest.class); + +/** + * Test creating a new QuorumController and closing it. + */ +@Test +public void testCreateAndClose() throws Throwable { +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) { +try (QuorumControllerTestEnv controlEnv = + new QuorumControllerTestEnv(logEnv, __ -> { })) { +} +} +} + +/** + * Test setting some configuration values and reading them back. + */ +@Test +public void testConfigurationOperations() throws Throwable { +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) { +try (QuorumControllerTestEnv controlEnv = + new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) { +testConfigurationOperations(controlEnv.activeController()); +} +} +} + +private void testConfigurationOperations(QuorumController controller) throws Throwable { +assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), +controller.incrementalAlterConfigs(Collections.singletonMap( +BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), true).get()); +assertEquals(Collections.singletonMap(BROKER0, +new ResultOrError<>(Collections.emptyMap())), +controller.describeConfigs(Collections.singletonMap( +BROKER0, Collections.emptyList())).get()); +assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), +controller.incrementalAlterConfigs(Collections.singletonMap( +BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), false).get()); +assertEquals(Collections.singletonMap(BROKER0, new ResultOrError<>(Collections. +singletonMap("baz", "123"))), +controller.describeConfigs(Collections.singletonMap( +BROKER0, Collections.emptyList())).get
[jira] [Resolved] (KAFKA-12232) Distinguish API scope by broker/controller
[ https://issues.apache.org/jira/browse/KAFKA-12232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12232. - Resolution: Duplicate > Distinguish API scope by broker/controller > -- > > Key: KAFKA-12232 > URL: https://issues.apache.org/jira/browse/KAFKA-12232 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > After KIP-500, not all APIs will be available on all listeners. Specifically, > there are controller-only APIs which are only accessible on the controller > listener (e.g. the Raft APIs). In general, we have three API scopes: > client: must be exposed on client listener > broker: must be exposed on inter-broker listener > controller: must be exposed on controller listener > These categories are not mutually exclusive. The `Fetch` API is required on > all listeners as an example, so we need a way to represent the scope as a set > in `ApiKeys`. > We should also put some thought into how this scope is reflected through the > ApiVersions API. I think it makes sense to only advertise APIs that can be > handled. For example, if the controller does not have a handler for the > `FindCoordinator` API, then it doesn't make sense to advertise it. > Potentially we could be even more restrictive when it comes to the > inter-broker APIs. For example, we might not need to advertise > `WriteTxnMarkers` on client-only listeners since a client should never use > this API. Alternatively, we can make it simple and just identify APIs by > controller, broker, or both. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12278) Keep api versions consistent with api scope
[ https://issues.apache.org/jira/browse/KAFKA-12278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286803#comment-17286803 ] Jason Gustafson commented on KAFKA-12278: - [~tombentley] Haha, I was about to write something about great minds thinking alike when I noticed that I filed the original. Since we merged this one, I'll close the other as a duplicate. > Keep api versions consistent with api scope > --- > > Key: KAFKA-12278 > URL: https://issues.apache.org/jira/browse/KAFKA-12278 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > With KIP-500, some APIs are only accessible by the broker and some are only > accessible by the controller. We need a better way to indicate the scope of > the API so that we can keep it consistent with the `ApiVersions` API. > Basically we have the following scopes: > - zk broker (e.g. LeaderAndIsr) > - kip-500 broker (e.g. DecommissionBroker) > - kip-500 controller (e.g. Envelope) > These categories are not mutually exclusive. For example, the `Fetch` API > must be exposed in all scopes. We could go even further by distinguishing an > inter-broker scope, but that is probably not needed for now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12278) Keep api versions consistent with api scope
[ https://issues.apache.org/jira/browse/KAFKA-12278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12278. - Resolution: Fixed > Keep api versions consistent with api scope > --- > > Key: KAFKA-12278 > URL: https://issues.apache.org/jira/browse/KAFKA-12278 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > With KIP-500, some APIs are only accessible by the broker and some are only > accessible by the controller. We need a better way to indicate the scope of > the API so that we can keep it consistent with the `ApiVersions` API. > Basically we have the following scopes: > - zk broker (e.g. LeaderAndIsr) > - kip-500 broker (e.g. DecommissionBroker) > - kip-500 controller (e.g. Envelope) > These categories are not mutually exclusive. For example, the `Fetch` API > must be exposed in all scopes. We could go even further by distinguishing an > inter-broker scope, but that is probably not needed for now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
junrao commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578851481 ## File path: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.FeatureMap; +import org.apache.kafka.metadata.FeatureMapAndEpoch; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; + + +public class FeatureControlManager { +/** + * The features supported by this controller's software. + */ +private final Map supportedFeatures; + +/** + * Maps feature names to finalized version ranges. + */ +private final TimelineHashMap finalizedVersions; + +/** + * The latest feature epoch. + */ +private final TimelineHashSet epoch; + +FeatureControlManager(Map supportedFeatures, + SnapshotRegistry snapshotRegistry) { +this.supportedFeatures = supportedFeatures; +this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0); +this.epoch = new TimelineHashSet<>(snapshotRegistry, 0); +} + +ControllerResult> updateFeatures( +Map updates, Set downgradeables, +Map> brokerFeatures) { +TreeMap results = new TreeMap<>(); +List records = new ArrayList<>(); +for (Entry entry : updates.entrySet()) { +results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), +downgradeables.contains(entry.getKey()), brokerFeatures, records)); +} +return new ControllerResult<>(records, results); +} + +private ApiError updateFeature(String featureName, + VersionRange newRange, + boolean downgradeable, + Map> brokerFeatures, + List records) { +if (newRange.min() <= 0) { +return new ApiError(Errors.INVALID_UPDATE_VERSION, +"The lower value for the new range cannot be less than 1."); +} +if (newRange.max() <= 0) { +return new ApiError(Errors.INVALID_UPDATE_VERSION, +"The upper value for the new range cannot be less than 1."); +} +VersionRange localRange = supportedFeatures.get(featureName); Review comment: Yes, that's the problem. From a consistency perspective, it seems that we should use the supported features from either all controller nodes or none. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578850804 ## File path: metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + + +public final class MockControllerMetrics implements ControllerMetrics { Review comment: It's used in unit tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578850598 ## File path: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.FeatureMap; +import org.apache.kafka.metadata.FeatureMapAndEpoch; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; + + +public class FeatureControlManager { +/** + * The features supported by this controller's software. + */ +private final Map supportedFeatures; + +/** + * Maps feature names to finalized version ranges. + */ +private final TimelineHashMap finalizedVersions; + +/** + * The latest feature epoch. + */ +private final TimelineHashSet epoch; + +FeatureControlManager(Map supportedFeatures, + SnapshotRegistry snapshotRegistry) { +this.supportedFeatures = supportedFeatures; +this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0); +this.epoch = new TimelineHashSet<>(snapshotRegistry, 0); +} + +ControllerResult> updateFeatures( +Map updates, Set downgradeables, +Map> brokerFeatures) { +TreeMap results = new TreeMap<>(); +List records = new ArrayList<>(); +for (Entry entry : updates.entrySet()) { +results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), +downgradeables.contains(entry.getKey()), brokerFeatures, records)); +} +return new ControllerResult<>(records, results); +} + +private ApiError updateFeature(String featureName, + VersionRange newRange, + boolean downgradeable, + Map> brokerFeatures, + List records) { +if (newRange.min() <= 0) { +return new ApiError(Errors.INVALID_UPDATE_VERSION, +"The lower value for the new range cannot be less than 1."); +} +if (newRange.max() <= 0) { +return new ApiError(Errors.INVALID_UPDATE_VERSION, +"The upper value for the new range cannot be less than 1."); +} +VersionRange localRange = supportedFeatures.get(featureName); Review comment: Hmm... right now, we don't have a good way of finding out what features the other controllers support. Maybe we will have to think more about this when we support rolling upgrade in kip-500. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10138: KAFKA-12331: Use LEO for the base offset of LeaderChangeMessage batch
jsancio commented on pull request #10138: URL: https://github.com/apache/kafka/pull/10138#issuecomment-781734293 We got the following failures: ``` core: ConfigCommandTest. shouldFailIfUnresolvableHost() ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper() DelegationTokenCommandTest. testDelegationTokenRequests() MetricsTest. testMetrics() DynamicConfigChangeTest. testIpHandlerUnresolvableAddress() DynamicConfigTest. shouldFailIpConfigsWithBadHost() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12331) KafkaRaftClient should use the LEO when appending LeaderChangeMessage
[ https://issues.apache.org/jira/browse/KAFKA-12331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12331. - Resolution: Fixed > KafkaRaftClient should use the LEO when appending LeaderChangeMessage > - > > Key: KAFKA-12331 > URL: https://issues.apache.org/jira/browse/KAFKA-12331 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > KafkaMetadataLog's appendAsLeader expects the base offset to match the LEO. > This is enforced when KafkaRaftClient uses the BatchAccumulator to create > batches. When creating the control batch for the LeaderChangeMessage the > KafkaRaftClient doesn't use the BatchAccumulator and instead creates the > batch with the default base offset of 0. > This causes the validation in KafkaMetadataLog to fail with the following > exception: > {code:java} > kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to > @metadata-0. First offset 0 is less than the next offset 5. First 10 offsets > in append: ArrayBuffer(0), last offset in append: 0. Log start offset = 0 > at kafka.log.Log.append(Log.scala:1217) > at kafka.log.Log.appendAsLeader(Log.scala:1092) > at kafka.raft.KafkaMetadataLog.appendAsLeader(KafkaMetadataLog.scala:92) > at > org.apache.kafka.raft.KafkaRaftClient.appendAsLeader(KafkaRaftClient.java:1158) > at > org.apache.kafka.raft.KafkaRaftClient.appendLeaderChangeMessage(KafkaRaftClient.java:449) > at > org.apache.kafka.raft.KafkaRaftClient.onBecomeLeader(KafkaRaftClient.java:409) > at > org.apache.kafka.raft.KafkaRaftClient.maybeTransitionToLeader(KafkaRaftClient.java:463) > at > org.apache.kafka.raft.KafkaRaftClient.handleVoteResponse(KafkaRaftClient.java:663) > at > org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1530) > at > org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1652) > at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2183) > at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > {code} > We should make the following changes: > # Fix MockLog to perform similar validation as > KafkaMetadataLog::appendAsLeader > # Use the LEO when creating the control batch for the LeaderChangedMessage -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10138: KAFKA-12331: Use LEO for the base offset of LeaderChangeMessage batch
hachikuji merged pull request #10138: URL: https://github.com/apache/kafka/pull/10138 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests
jolshan edited a comment on pull request #10143: URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199 Ran tests locally with newest code from trunk. Here are the tests that failed: ``` ConsumerBounceTest.testClose() ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() ``` These are tests I've found to be flaky running locally in the past. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests
jolshan edited a comment on pull request #10143: URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199 Ran tests locally with newest code from trunk. Here are the tests that failed: ``` ConsumerBounceTest.testClose() ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() ``` These are tests I've found to be flaky running locally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests
jolshan commented on pull request #10143: URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199 Ran tests locally with newest code from trunk. Here are the tests that failed: ConsumerBounceTest. testClose() ConnectionQuotasTest. testListenerConnectionRateLimitWhenActualRateAboveLimit() DynamicConnectionQuotaTest. testDynamicListenerConnectionCreationRateQuota() These are tests I've found to be flaky running locally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10138: KAFKA-12331: Use LEO for the base offset of LeaderChangeMessage batch
jsancio commented on pull request #10138: URL: https://github.com/apache/kafka/pull/10138#issuecomment-781730638 Ran the following commands locally: ``` $ ./gradlew -version Gradle 6.8.1 Build time: 2021-01-22 13:20:08 UTC Revision: 31f14a87d93945024ab7a78de84102a3400fa5b2 Kotlin: 1.4.20 Groovy: 2.5.12 Ant: Apache Ant(TM) version 1.10.9 compiled on September 27 2020 JVM: 1.8.0_282 (Private Build 25.282-b08) OS: Linux 5.8.0-7642-generic amd64 ./gradlew -PscalaVersion=2.12 clean compileJava compileScala compileTestJava compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile --no-daemon --continue -PxmlSpotBugsReport=true ./gradlew -PscalaVersion=2.13 clean compileJava compileScala compileTestJava compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile --no-daemon --continue -PxmlSpotBugsReport=true ./gradlew -PscalaVersion=2.12 unitTest integrationTest --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=5 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes
cmccabe closed pull request #10066: URL: https://github.com/apache/kafka/pull/10066 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes
cmccabe commented on pull request #10066: URL: https://github.com/apache/kafka/pull/10066#issuecomment-781727318 pushed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10134: TRIVIAL: fix JavaDocs formatting
mjsax commented on pull request #10134: URL: https://github.com/apache/kafka/pull/10134#issuecomment-781715946 Merged to `trunk` and cherry-picked to `2.8` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10134: TRIVIAL: fix JavaDocs formatting
mjsax merged pull request #10134: URL: https://github.com/apache/kafka/pull/10134 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop
[ https://issues.apache.org/jira/browse/KAFKA-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286776#comment-17286776 ] Matthias J. Sax commented on KAFKA-10192: - Failed in setup: [https://github.com/apache/kafka/pull/10134/checks?check_run_id=1915517987] {quote} {{org.opentest4j.AssertionFailedError: Condition not met within timeout 3. Worker did not complete startup in time ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at org.apache.kafka.connect.integration.BlockingConnectorTest.setup(BlockingConnectorTest.java:133)}} {quote} > Flaky test BlockingConnectorTest#testBlockInConnectorStop > - > > Key: KAFKA-10192 > URL: https://issues.apache.org/jira/browse/KAFKA-10192 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Boyang Chen >Assignee: Luke Chen >Priority: Major > Fix For: 2.6.0, 2.7.0 > > > h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/] > h3. Error Message > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > execute PUT request. Error response: \{"error_code":500,"message":"Request > timed out"} > h3. Stacktrace > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > execute PUT request. Error response: \{"error_code":500,"message":"Request > timed out"} at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300) > at > org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185) > at > org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at
[jira] [Commented] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
[ https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286774#comment-17286774 ] Matthias J. Sax commented on KAFKA-10579: - Different test method for same test: [https://github.com/apache/kafka/pull/10134/checks?check_run_id=1915520915] {{org.opentest4j.AssertionFailedError: Condition not met within timeout 3. Didn't find the topics [connect-storage-topic-connect-cluster-1, connect-config-topic-connect-cluster-1, connect-offset-topic-connect-cluster-1] ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertTopicsExist(EmbeddedConnectClusterAssertions.java:163) at org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithDefaultSettings(InternalTopicsIntegrationTest.java:81)}} > Flaky test > connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy > > > Key: KAFKA-10579 > URL: https://issues.apache.org/jira/browse/KAFKA-10579 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > > {{java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) > at > org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}} > {{}} > https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter
[ https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12336: Labels: easy-fix newbie (was: ) > custom stream naming does not work while calling stream[K, V](topicPattern: > Pattern) API with named Consumed parameter > --- > > Key: KAFKA-12336 > URL: https://issues.apache.org/jira/browse/KAFKA-12336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Ramil Israfilov >Priority: Major > Labels: easy-fix, newbie > > In our Scala application I am trying to implement custom naming for Kafka > Streams application nodes. > We are using topicPattern for our stream source. > Here is an API which I am calling: > > {code:java} > val topicsPattern="t-[A-Za-z0-9-].suffix" > val operations: KStream[MyKey, MyValue] = > builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))( > Consumed.`with`[MyKey, MyValue].withName("my-fancy-name") > ) > {code} > Despite the fact that I am providing Consumed with custom name the topology > describe still show "KSTREAM-SOURCE-00" as name for our stream source. > It is not a problem if I just use a name for topic. But our application needs > to get messages from set of topics based on topicname pattern matching. > After checking the kakfa code I see that > org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line > 103) has a bug: > {code:java} > public KStream stream(final Pattern topicPattern, >final ConsumedInternal consumed) { > final String name = newProcessorName(KStreamImpl.SOURCE_NAME); > final StreamSourceNode streamPatternSourceNode = new > StreamSourceNode<>(name, topicPattern, consumed); > {code} > node name construction does not take into account the name of consumed > parameter. > For example code for another stream api call with topic name does it > correctly: > {code:java} > final String name = new > NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, > KStreamImpl.SOURCE_NAME); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter
[ https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12336: Priority: Minor (was: Major) > custom stream naming does not work while calling stream[K, V](topicPattern: > Pattern) API with named Consumed parameter > --- > > Key: KAFKA-12336 > URL: https://issues.apache.org/jira/browse/KAFKA-12336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Ramil Israfilov >Priority: Minor > Labels: easy-fix, newbie > > In our Scala application I am trying to implement custom naming for Kafka > Streams application nodes. > We are using topicPattern for our stream source. > Here is an API which I am calling: > > {code:java} > val topicsPattern="t-[A-Za-z0-9-].suffix" > val operations: KStream[MyKey, MyValue] = > builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))( > Consumed.`with`[MyKey, MyValue].withName("my-fancy-name") > ) > {code} > Despite the fact that I am providing Consumed with custom name the topology > describe still show "KSTREAM-SOURCE-00" as name for our stream source. > It is not a problem if I just use a name for topic. But our application needs > to get messages from set of topics based on topicname pattern matching. > After checking the kakfa code I see that > org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line > 103) has a bug: > {code:java} > public KStream stream(final Pattern topicPattern, >final ConsumedInternal consumed) { > final String name = newProcessorName(KStreamImpl.SOURCE_NAME); > final StreamSourceNode streamPatternSourceNode = new > StreamSourceNode<>(name, topicPattern, consumed); > {code} > node name construction does not take into account the name of consumed > parameter. > For example code for another stream api call with topic name does it > correctly: > {code:java} > final String name = new > NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, > KStreamImpl.SOURCE_NAME); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12328) Expose TaskId partition number
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12328: Summary: Expose TaskId partition number (was: Find out partition of a store iterator) > Expose TaskId partition number > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12328) Find out partition of a store iterator
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12328: Labels: needs-kip (was: ) > Find out partition of a store iterator > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12328) Find out partition of a store iterator
[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286771#comment-17286771 ] Matthias J. Sax commented on KAFKA-12328: - Personally, I am open to expose the partition number from the task-id. It would require a KIP though. – I guess we can leave this ticket open an see if there is more interest. > Find out partition of a store iterator > -- > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: fml2 >Priority: Major > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { >var entry = iter.next(); >// ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes
cmccabe commented on a change in pull request #10066: URL: https://github.com/apache/kafka/pull/10066#discussion_r578826004 ## File path: checkstyle/import-control.xml ## @@ -99,6 +100,7 @@ + Review comment: Hmm. Let's revisit this after 2.8. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes
cmccabe commented on a change in pull request #10066: URL: https://github.com/apache/kafka/pull/10066#discussion_r578825895 ## File path: core/src/main/scala/kafka/server/ApiVersionManager.scala ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.api.ApiVersion +import kafka.network +import kafka.network.RequestChannel +import org.apache.kafka.common.message.ApiMessageType.ListenerType +import org.apache.kafka.common.message.ApiVersionsResponseData +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.ApiVersionsResponse + +import scala.jdk.CollectionConverters._ + +trait ApiVersionManager { + def listenerType: ListenerType + def enabledApis: collection.Set[ApiKeys] + def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse + def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey) + def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis) +} + +object ApiVersionManager { + def apply( +listenerType: ListenerType, +config: KafkaConfig, +forwardingManager: Option[ForwardingManager], Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
hachikuji commented on a change in pull request #10094: URL: https://github.com/apache/kafka/pull/10094#discussion_r578823423 ## File path: bin/kafka-metadata-shell.sh ## @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@" Review comment: Actually, looks like this is documented in the KIP.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
hachikuji commented on a change in pull request #10094: URL: https://github.com/apache/kafka/pull/10094#discussion_r578818603 ## File path: bin/kafka-metadata-shell.sh ## @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@" Review comment: Or maybe an alternative is to put this under bin/metadata This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12344) Support SlidingWindows in the Scala API
[ https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12344: Affects Version/s: 2.7.0 > Support SlidingWindows in the Scala API > --- > > Key: KAFKA-12344 > URL: https://issues.apache.org/jira/browse/KAFKA-12344 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Leah Thomas >Assignee: Leah Thomas >Priority: Major > > in KIP-450 we implemented sliding windows for the Java API but left out a few > crucial methods to allow sliding windows to work through the Scala API. We > need to add those methods to make the Scala API fully leverage sliding windows -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578817527 ## File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java ## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigDef.ConfigKey; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource.Type; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND; + +public class ConfigurationControlManager { +private final Logger log; +private final SnapshotRegistry snapshotRegistry; +private final Map configDefs; +private final TimelineHashMap> configData; + +ConfigurationControlManager(LogContext logContext, +SnapshotRegistry snapshotRegistry, +Map configDefs) { +this.log = logContext.logger(ConfigurationControlManager.class); +this.snapshotRegistry = snapshotRegistry; +this.configDefs = configDefs; +this.configData = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * Determine the result of applying a batch of incremental configuration changes. Note + * that this method does not change the contents of memory. It just generates a + * result, that you can replay later if you wish using replay(). + * + * Note that there can only be one result per ConfigResource. So if you try to modify + * several keys and one modification fails, the whole ConfigKey fails and nothing gets + * changed. + * + * @param configChanges Maps each resource to a map from config keys to + * operation data. + * @return The result. + */ +ControllerResult> incrementalAlterConfigs( +Map>> configChanges) { +List outputRecords = new ArrayList<>(); +Map outputResults = new HashMap<>(); +for (Entry>> resourceEntry : +configChanges.entrySet()) { +incrementalAlterConfigResource(resourceEntry.getKey(), +resourceEntry.getValue(), +outputRecords, +outputResults); +} +return new ControllerResult<>(outputRecords, outputResults); +} + +private void incrementalAlterConfigResource(ConfigResource configResource, +Map> keysToOps, +List outputRecords, +Map outputResults) { +ApiError error = checkConfigResource(configResource); +if (error.isFailure()) { +outputResults.put(configResource, error); +return; +} +List newRecords = new ArrayList<>(); +for (Entry> keysToOpsEntry : keysToOps.entrySet()) { +String key = keysToOpsEntry.getKey(); +String currentValue = null; +TimelineHashMap currentConfigs = configData.get(configResource); +if (currentConfigs != null) { +currentValue = currentConfigs.get(key); +} +String newValue = currentValue; +Entry opTypeAndNewValue = keysToOpsEntry.getValue(); +OpType
[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code
cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r578817611 ## File path: metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java ## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.UsableBroker; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.TreeSet; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.controller.BrokerControlState.FENCED; +import static org.apache.kafka.controller.BrokerControlState.CONTROLLED_SHUTDOWN; +import static org.apache.kafka.controller.BrokerControlState.SHUTDOWN_NOW; +import static org.apache.kafka.controller.BrokerControlState.UNFENCED; + + +/** + * The BrokerHeartbeatManager manages all the soft state associated with broker heartbeats. + * Soft state is state which does not appear in the metadata log. This state includes + * things like the last time each broker sent us a heartbeat, and whether the broker is + * trying to perform a controlled shutdown. + * + * Only the active controller has a BrokerHeartbeatManager, since only the active + * controller handles broker heartbeats. Standby controllers will create a heartbeat + * manager as part of the process of activating. This design minimizes the size of the + * metadata partition by excluding heartbeats from it. However, it does mean that after + * a controller failover, we may take some extra time to fence brokers, since the new + * active controller does not know when the last heartbeats were received from each. + */ +public class BrokerHeartbeatManager { +static class BrokerHeartbeatState { +/** + * The broker ID. + */ +private final int id; + +/** + * The last time we received a heartbeat from this broker, in monotonic nanoseconds. + * When this field is updated, we also may have to update the broker's position in + * the unfenced list. + */ +long lastContactNs; + +/** + * The last metadata offset which this broker reported. When this field is updated, + * we may also have to update the broker's position in the active set. + */ +long metadataOffset; + +/** + * The offset at which the broker should complete its controlled shutdown, or -1 + * if the broker is not performing a controlled shutdown. When this field is + * updated, we also have to update the broker's position in the shuttingDown set. + */ +private long controlledShutDownOffset; + +/** + * The previous entry in the unfenced list, or null if the broker is not in that list. + */ +private BrokerHeartbeatState prev; + +/** + * The next entry in the unfenced list, or null if the broker is not in that list. + */ +private BrokerHeartbeatState next; + +BrokerHeartbeatState(int id) { +this.id = id; +this.lastContactNs = 0; +this.prev = null; +this.next = null; +this.metadataOffset = -1; +this.controlledShutDownOffset = -1; +} + +/** + * Returns the broker ID. + */ +int id() { +return id; +} + +/** + * Returns true only if the broker is fenced. + */ +boolean fenced() { +return prev == null; +} + +/** + * Returns true only if the broker is in controlled shutdown state. + */ +boolean shuttingDown() { +return controlledShutDownOffset >= 0; +
[jira] [Updated] (KAFKA-12344) Support SlidingWindows in the Scala API
[ https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12344: Labels: (was: streams) > Support SlidingWindows in the Scala API > --- > > Key: KAFKA-12344 > URL: https://issues.apache.org/jira/browse/KAFKA-12344 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Leah Thomas >Assignee: Leah Thomas >Priority: Major > > in KIP-450 we implemented sliding windows for the Java API but left out a few > crucial methods to allow sliding windows to work through the Scala API. We > need to add those methods to make the Scala API fully leverage sliding windows -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12344) Support SlidingWindows in the Scala API
[ https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12344: Component/s: streams > Support SlidingWindows in the Scala API > --- > > Key: KAFKA-12344 > URL: https://issues.apache.org/jira/browse/KAFKA-12344 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Leah Thomas >Assignee: Leah Thomas >Priority: Major > Labels: streams > > in KIP-450 we implemented sliding windows for the Java API but left out a few > crucial methods to allow sliding windows to work through the Scala API. We > need to add those methods to make the Scala API fully leverage sliding windows -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell
hachikuji commented on a change in pull request #10094: URL: https://github.com/apache/kafka/pull/10094#discussion_r578813351 ## File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java ## @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * The LocalLogManager is a test implementation that relies on the contents of memory. + */ +public final class LocalLogManager implements MetaLogManager, AutoCloseable { Review comment: We have this class already checked in under `metadata/src/test/java`. If it needs to be here, can we just move it? ## File path: settings.gradle ## @@ -29,6 +29,7 @@ include 'clients', 'log4j-appender', 'metadata', 'raft', +'shell', Review comment: I liked @mumrah's suggestion to call this module `metashell`. ## File path: core/src/main/scala/kafka/server/Server.scala ## @@ -29,6 +29,7 @@ trait Server { } object Server { + val metadataTopicName = "@metadata" Review comment: Can we use `KafkaRaftServer.MetadataTopic` and remove this? ## File path: bin/kafka-metadata-shell.sh ## @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@" Review comment: The shell basically becomes a public api with this. I thought I recalled that we were going to do a separate KIP? An alternative would be to locate this under `shell/bin`. Or maybe we can print a message when the tool starts out which emphasizes that this is an experimental tool without any compatibility guarantees. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10155: Fix Raft broker restart issue when offset partitions are deferred
rondagostino commented on pull request #10155: URL: https://github.com/apache/kafka/pull/10155#issuecomment-781694442 Successfully ran `./gradlew build` after commenting out these two flaky tests, **both of which passed locally when run individually afterwards**: `ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()` `ConsumerBounceTest.testClose()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12344) Support SlidingWindows in the Scala API
Leah Thomas created KAFKA-12344: --- Summary: Support SlidingWindows in the Scala API Key: KAFKA-12344 URL: https://issues.apache.org/jira/browse/KAFKA-12344 Project: Kafka Issue Type: Improvement Reporter: Leah Thomas Assignee: Leah Thomas in KIP-450 we implemented sliding windows for the Java API but left out a few crucial methods to allow sliding windows to work through the Scala API. We need to add those methods to make the Scala API fully leverage sliding windows -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda opened a new pull request #10156: KAFKA-10345 (WIP): File watch store reloading
abbccdda opened a new pull request #10156: URL: https://github.com/apache/kafka/pull/10156 Add file-based store reloading mechanism, which does both file watch triggering and time based reloading in a separate thread. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10146: HOTFIX: fix Scala 2.12 build error caused by ControllerApisTest.scala
ijuma merged pull request #10146: URL: https://github.com/apache/kafka/pull/10146 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10146: HOTFIX: fix Scala 2.12 build error caused by ControllerApisTest.scala
rondagostino commented on pull request #10146: URL: https://github.com/apache/kafka/pull/10146#issuecomment-781651604 Successfully confirmed this fix locally as per comment in #10155. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10155: Fix Raft broker restart issue when offset partitions are deferred
rondagostino commented on pull request #10155: URL: https://github.com/apache/kafka/pull/10155#issuecomment-781650728 I successfully compiled locally. Compiling with `-PscalaVersion=2.12` failed locally but then succeeded after applying the one-liner fix from https://github.com/apache/kafka/pull/10146. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10113: MINOR: Add KIP-500 BrokerServer and ControllerServer
ijuma commented on pull request #10113: URL: https://github.com/apache/kafka/pull/10113#issuecomment-781640866 @cmccabe Looks like the commit message is not as helpful as the PR description. We typically copy the PR description to the commit message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10127: MINOR: Remove unused LeaderAndIsrResponse.partitions() since it has been replaced with partitionErrors()
ijuma commented on pull request #10127: URL: https://github.com/apache/kafka/pull/10127#issuecomment-781639872 The implementation that was removed was more efficient than the new implementation. Is there a reason for that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org