[GitHub] [kafka] showuon closed pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed
showuon closed pull request #9791: URL: https://github.com/apache/kafka/pull/9791 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed
showuon commented on pull request #9791: URL: https://github.com/apache/kafka/pull/9791#issuecomment-769632410 @kkonstantine , thanks for the comments. Good to me. Close this PR. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12246) Remove redundant suppression in KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-12246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12246: -- Assignee: YI-CHEN WANG > Remove redundant suppression in KafkaAdminClient > > > Key: KAFKA-12246 > URL: https://issues.apache.org/jira/browse/KAFKA-12246 > Project: Kafka > Issue Type: Improvement >Reporter: YI-CHEN WANG >Assignee: YI-CHEN WANG >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset
mjsax commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r566621619 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -227,6 +230,27 @@ public void initializeIfNeeded() { } } +private void initOffsetsIfNeeded(final java.util.function.Consumer> offsetResetter) { +final Map committed = mainConsumer.committed(resetOffsetsForPartitions); +for (final Map.Entry committedEntry : committed.entrySet()) { +final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue(); +if (offsetAndMetadata != null) { +mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata); +resetOffsetsForPartitions.remove(committedEntry.getKey()); +} +} + +if (!resetOffsetsForPartitions.isEmpty()) { Review comment: this `if` is not strictly required, however, it allows us to just pass `null` as `offsetResetter` in tests, so might be worth it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset
mjsax commented on a change in pull request #1: URL: https://github.com/apache/kafka/pull/1#discussion_r566621365 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -227,6 +230,27 @@ public void initializeIfNeeded() { } } +private void initOffsetsIfNeeded(final java.util.function.Consumer> offsetResetter) { Review comment: I was considering to maybe merge this method into `initMetadata()` but it might convolute different code path, and we should execute this method rarely anyway so I don't think we should have concerns about calling `mainConsumer.committed` twice for rare cases. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10000: KAFKA-9274: handle TimeoutException on task reset
chia7712 commented on pull request #1: URL: https://github.com/apache/kafka/pull/1#issuecomment-769626443 congratulations on PR 10,000 :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
tombentley commented on pull request #9441: URL: https://github.com/apache/kafka/pull/9441#issuecomment-769621007 Thanks, and no worries about the wait. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
guozhangwang commented on pull request #9441: URL: https://github.com/apache/kafka/pull/9441#issuecomment-769618984 Yes! I will review it again. Thanks for hanging on there and my apologies... Review has always been a bit overwhelming for me :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9268: KAFKA-10442; Add transaction admin APIs for KIP-664
guozhangwang commented on pull request #9268: URL: https://github.com/apache/kafka/pull/9268#issuecomment-769618616 Sorry man... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274187#comment-17274187 ] Guozhang Wang commented on KAFKA-10793: --- cc [~hachikuji] [~ijuma] Hopefully we nailed it this time! :) > Race condition in FindCoordinatorFuture permanently severs connection to > group coordinator > -- > > Key: KAFKA-10793 > URL: https://issues.apache.org/jira/browse/KAFKA-10793 > Project: Kafka > Issue Type: Bug > Components: consumer, streams >Affects Versions: 2.5.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Critical > Fix For: 2.8.0, 2.7.1 > > > Pretty much as soon as we started actively monitoring the > _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we > started seeing something weird. Every so often one of the StreamThreads (ie a > single Consumer instance) would appear to permanently fall out of the group, > as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We > inject artificial network failures every few hours at most, so the group > rebalances quite often. But the one consumer never rejoins, with no other > symptoms (besides a slight drop in throughput since the remaining threads had > to take over this member's work). We're confident that the problem exists in > the client layer, since the logs confirmed that the unhealthy consumer was > still calling poll. It was also calling Consumer#committed in its main poll > loop, which was consistently failing with a TimeoutException. > When I attached a remote debugger to an instance experiencing this issue, the > network client's connection to the group coordinator (the one that uses > MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But > for some reason it never tried to re-establish this connection, although it > did successfully connect to that same broker through the "normal" connection > (ie the one that juts uses node.id). > The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed > (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null > so a new request is never sent. This shouldn't be possible since the > FindCoordinatorResponseHandler is supposed to clear the > _findCoordinatorFuture_ when the future is completed. But somehow that didn't > happen, so the consumer continues to assume there's still a FindCoordinator > request in flight and never even notices that it's dropped out of the group. > These are the only confirmed findings so far, however we have some guesses > which I'll leave in the comments. Note that we only noticed this due to the > newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to > believe this bug hasn't been flying under the radar since the Consumer's > inception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10001: MINOR: AbstractCoordinatorTest should close coordinator explicitly
hachikuji commented on a change in pull request #10001: URL: https://github.com/apache/kafka/pull/10001#discussion_r566613107 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -102,6 +104,11 @@ private final String leaderId = "leaderId"; private final int defaultGeneration = -1; +@AfterEach +public void closeCoordinator() { +Utils.closeQuietly(coordinator, "close coordinator"); Review comment: While we're at it, shall we close `consumerClient`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
guozhangwang commented on pull request #9671: URL: https://github.com/apache/kafka/pull/9671#issuecomment-769616246 Awesome!! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #10001: MINOR: AbstractCoordinatorTest should close coordinator explicitly
chia7712 opened a new pull request #10001: URL: https://github.com/apache/kafka/pull/10001 I noticed this issue when digging into some flaky by JVM profiler. ```AbstractCoordinatorTest``` does not close coordinator so it can cause a lot of idle heartbeat threads in the following tests. ``` "kafka-coordinator-heartbeat-thread | dummy-group" #239 daemon prio=5 os_prio=0 cpu=4.40ms elapsed=29.26s tid=0x7f4798c34000 nid=0x11b6 in Object.wait() [0x7f471dbf5000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398) - waiting to re-lock in wait() <0x8152b250> (a org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator) "kafka-coordinator-heartbeat-thread | dummy-group" #240 daemon prio=5 os_prio=0 cpu=4.15ms elapsed=29.16s tid=0x7f4798c36800 nid=0x11b7 in Object.wait() [0x7f471d7f4000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398) - waiting to re-lock in wait() <0x8152e9c0> (a org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator) "kafka-coordinator-heartbeat-thread | dummy-group" #242 daemon prio=5 os_prio=0 cpu=0.23ms elapsed=29.04s tid=0x7f4798c39000 nid=0x11b9 in Object.wait() [0x7f471d3f3000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on at java.lang.Object.wait(java.base@11.0.9.1/Object.java:328) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1355) - waiting to re-lock in wait() <0x815107f8> (a org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator) "kafka-coordinator-heartbeat-thread | dummy-group" #244 daemon prio=5 os_prio=0 cpu=3.62ms elapsed=29.03s tid=0x7f4798c3b000 nid=0x11bb in Object.wait() [0x7f471cff2000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398) - waiting to re-lock in wait() <0x815330b0> (a org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator) "kafka-coordinator-heartbeat-thread | dummy-group" #245 daemon prio=5 os_prio=0 cpu=4.09ms elapsed=28.93s tid=0x7f4798c3d800 nid=0x11bc in Object.wait() [0x7f471cbf1000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398) - waiting to re-lock in wait() <0x815387e8> (a org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator) "kafka-coordinator-heartbeat-thread | dummy-group" #246 daemon prio=5 os_prio=0 cpu=4.14ms elapsed=28.83s tid=0x7f4798c3f000 nid=0x11bd in Object.wait() [0x7f471c7f] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398) - waiting to re-lock in wait() <0x815389d8> (a org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator) "kafka-coordinator-heartbeat-thread | dummy-group" #247 daemon prio=5 os_prio=0 cpu=4.08ms elapsed=28.72s tid=0x7f4798c41800 nid=0x11be in Object.wait() [0x7f46e3ffe000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398) - waiting to re-lock in wait() <0x81538bc8> (a org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator) ``` I don't observe the relationship between this issue and flaky. However, it seems to me explicitly releasing idle resource is always a good pattern. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI bu
[jira] [Commented] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274181#comment-17274181 ] Guozhang Wang commented on KAFKA-12169: --- [~zoushengfu] by "restart with unknown member id", do you mean you bounced the client leader at the same time? Also, which broker version are you running with? If you did bounced the leader and the brokers are on older versions (i.e. on older versions the broker would not trigger rebalance on non-leader joins with different metadata), there might indeed have a race condition here if we bounce the leader at the same time, such as: T0: topic partitions metadata changes from 1000 to 2000, but have not been propagated to the consumer group leader. T1: Leader is bounced, and then rejoined the group with a known instance.id, at that time its metadata is already at 2000 partitions, but the group coordinator would still give it the old assignment which only contains 1000 partitions. T2: Since then the leader would not try to resend the join-group since its "join-group metadata snapshot" is the same as the refreshed metadata. > Consumer can not know paritions change when client leader restart with static > membership protocol > - > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax opened a new pull request #10000: KAFKA-9274: handle TimeoutException on task reset
mjsax opened a new pull request #1: URL: https://github.com/apache/kafka/pull/1 - part of KIP-572 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9999: MINOR: Ensure `InterBrokerSendThread` closes `NetworkClient`
hachikuji opened a new pull request #: URL: https://github.com/apache/kafka/pull/ We should ensure `NetworkClient` is closed properly when `InterBrokerSendThread` is shutdown. Also use `initiateShutdown` instead of `wakeup()` to alert polling thread. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12169: -- Summary: Consumer can not know paritions change when client leader restart with static membership protocol (was: Consumer can not know paritions chage when client leader restart with static membership protocol) > Consumer can not know paritions change when client leader restart with static > membership protocol > - > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed
kkonstantine commented on pull request #9791: URL: https://github.com/apache/kafka/pull/9791#issuecomment-769611695 The former I guess. I don't see a strong case on skipping the messages based on number of occurrences. Seems we can use this information in the log and that it should be present in abnormal circumstances (maybe with a severity that varies) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671
guozhangwang commented on pull request #9969: URL: https://github.com/apache/kafka/pull/9969#issuecomment-769608173 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
g1geordie commented on pull request #9906: URL: https://github.com/apache/kafka/pull/9906#issuecomment-769603195 @chia7712 Thank for your patch . I follow the style and change `MemoryRecordsBuilderTest` Can you help me take a look? :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
guozhangwang commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r566597020 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -41,8 +43,19 @@ object Serdes { implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() - implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] = -new WindowedSerdes.TimeWindowedSerde[T](tSerde) + implicit def timeWindowedSerde[T](implicit inner: Serde[T]): Serde[Windowed[T]] = +new JSerdes.WrapperSerde[Windowed[T]]( + new TimeWindowedSerializer[T](inner.serializer), + new TimeWindowedDeserializer[T](inner.deserializer) { +override def deserialize(topic: String, data: Array[Byte]): Windowed[T] = { Review comment: Thinking about this once again: if users did used the `timeWindowedSerde`, then we would log a WARN on each record, which would soon flood the log files. So instead of log at the `deserialize` function, maybe it's better to just log once at the `timeWindowedSerde` itself? ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender +import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde +import org.junit.Assert.assertFalse +import org.junit.Test + +class SerdesUnitTest { + + @Test + def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = { + +Serdes.timeWindowedSerde(new TimeWindowedSerde[String]()) +val appender = LogCaptureAppender.createAndRegister() +val warning = appender.getMessages() +assertFalse("There should be a warning about TimeWindowedDeserializer", warning.isEmpty) Review comment: Personally I'm not a big fan to add tests coverage for deprecated functions to make sure the expected warning is generated :) Also, is this test correct? Since currently the `Serdes.timeWindowedSerde` itself would not log any warning, and we would only log when the `deserialize` is called right (see my other comment)? Anyways, I'm actually okay if you would like to just remove this test. But on the other hand we should try to avoid a WARN at a per-record granularity too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9998: KAFKA-12250; Add metadata record serde for KIP-631
hachikuji opened a new pull request #9998: URL: https://github.com/apache/kafka/pull/9998 This patch adds a `RecordSerde` implementation for the metadata record format expected by KIP-631. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12250) Add metadata record serde logic for KIP-631
Jason Gustafson created KAFKA-12250: --- Summary: Add metadata record serde logic for KIP-631 Key: KAFKA-12250 URL: https://issues.apache.org/jira/browse/KAFKA-12250 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson KIP-631 specifies the schema for records written to the log. We need to write an instance of `RecordSerd` for this format. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax do you recommend adding method to AbstractProcessorContext to set cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add setSystemTimeMs back to [InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49) and set the system time like it was done before? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax do you recommend adding method to AbstractProcessorContext to set cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add setSystemTimeMs back to [InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
rohitrmd commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; -private long currentSystemTimeMs; Review comment: @mjsax do you recommend adding method to AbstractProcessorContext to set cachedSystemTimeMs (setCachedSystemTimeMs)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500
hachikuji commented on a change in pull request #9967: URL: https://github.com/apache/kafka/pull/9967#discussion_r566587032 ## File path: core/src/main/scala/kafka/server/Server.scala ## @@ -46,6 +46,22 @@ object Server { new Metrics(metricConfig, reporters, time, true, metricsContext) } + def initializeMetrics( +config: KafkaConfig, +time: Time, +metaProps: MetaProperties Review comment: @chia7712 We need the clusterId from `MetaProperties`. I modified this to pass it through directly avoid the duplicate methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`
mjsax commented on a change in pull request #9997: URL: https://github.com/apache/kafka/pull/9997#discussion_r566586589 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) { if (recordInfo.queue().size() == maxBufferedSize) { mainConsumer.resume(singleton(partition)); } -} catch (final StreamsException e) { -throw e; + +record = null; +} catch (final TimeoutException timeoutException) { +if (!eosEnabled) { +throw timeoutException; +} else { +record = null; +throw new TaskCorruptedException(Collections.singletonMap(id, changelogPartitions())); Review comment: Note that we don't trigger `task.timeout.ms` for this case atm. Because we need to restore state what might talk some time, it seems questionable if we should tigger `task.timeout.ms` for this case of not. Cf. TaskManager#process() that catches `TimeoutException` and trigger the timeout, but does not catch `TaskCorruptedException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`
mjsax commented on a change in pull request #9997: URL: https://github.com/apache/kafka/pull/9997#discussion_r566580878 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) { if (recordInfo.queue().size() == maxBufferedSize) { mainConsumer.resume(singleton(partition)); } -} catch (final StreamsException e) { -throw e; + +record = null; +} catch (final TimeoutException timeoutException) { +if (!eosEnabled) { +throw timeoutException; +} else { +record = null; +throw new TaskCorruptedException(Collections.singletonMap(id, changelogPartitions())); Review comment: For this case, we don't need the cached record, as we need to reset the task anyway to cleanup potentially "corrupted" state store. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`
mjsax commented on a change in pull request #9997: URL: https://github.com/apache/kafka/pull/9997#discussion_r566580717 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -663,18 +666,21 @@ public boolean isProcessable(final long wallClockTime) { */ @SuppressWarnings("unchecked") public boolean process(final long wallClockTime) { -if (!isProcessable(wallClockTime)) { -return false; -} +if (record == null) { Review comment: If we have a cached record, it implies we failed to process it before -- thus we don't pull a new record from the buffer but retry to process the cached record. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`
mjsax commented on a change in pull request #9997: URL: https://github.com/apache/kafka/pull/9997#discussion_r566580568 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -99,6 +100,7 @@ private final InternalProcessorContext processorContext; private final RecordQueueCreator recordQueueCreator; +private StampedRecord record; Review comment: We pull this variable out form the method to use it as a "cache" -- if we fail on send(), we can process the record a second time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`
mjsax opened a new pull request #9997: URL: https://github.com/apache/kafka/pull/9997 - part of KIP-572 When a custom `StreamPartitioner` is used, we need to get the number of partitions of output topics from the producer. This `partitionFor(topic)` call may through a `TimeoutException` that we now handle gracefully. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566576655 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { Review comment: Oh my god. I hate json lol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566559281 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { Review comment: I'll try it out with json. If we do use json, then we don't even need the version number right? As long as we only ever add fields then it should always be compatible. At least that's my understanding, I'm not a json expert This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 commented on pull request #9991: MINOR: Reorder the modifiers and Replace Map.get with Map.computeIfAbsent
tang7526 commented on pull request #9991: URL: https://github.com/apache/kafka/pull/9991#issuecomment-769547448 > @tang7526 Could you merge trunk to trigger QA again? Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9991: MINOR: Reorder the modifiers and Replace Map.get with Map.computeIfAbsent
chia7712 commented on pull request #9991: URL: https://github.com/apache/kafka/pull/9991#issuecomment-769546117 @tang7526 Could you merge trunk to trigger QA again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
dengziming commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r566550612 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1879,6 +1879,109 @@ class KafkaApisTest { assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty) } + @Test + def testUnauthorizedTopicMetadataRequest(): Unit = { + +// 1. Set up broker information +val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) +val broker = new UpdateMetadataBroker() + .setId(0) + .setRack("rack") + .setEndpoints(Seq( +new UpdateMetadataEndpoint() + .setHost("broker0") + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setListener(plaintextListener.value) + ).asJava) + +// 2. Set up authorizer +val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) +val unauthorizedTopic = "unauthorized-topic" +val authorizedTopic = "authorized-topic" + +val expectedActions = Seq( + new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true), + new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true) +) + +val expectedAuthorizeResult = Seq(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava + +EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(expectedActions.asJava))) + .andReturn(expectedAuthorizeResult) + .times(2) + +// 3. Set up MetadataCache +val authorizedTopicId = Uuid.randomUuid(); +val unauthorizedTopicId = Uuid.randomUuid(); + +val topicIds = new util.HashMap[String, Uuid]() +topicIds.put(authorizedTopic, authorizedTopicId) +topicIds.put(unauthorizedTopic, unauthorizedTopicId) + +def createDummyPartitionStates(topic: String) = { + new UpdateMetadataPartitionState() +.setTopicName(topic) +.setPartitionIndex(0) +.setControllerEpoch(1) +.setLeader(0) +.setLeaderEpoch(1) +.setReplicas(Collections.singletonList(0)) +.setZkVersion(0) +.setIsr(Collections.singletonList(0)) +} + +// Send UpdateMetadataReq to update MetadataCache +val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates) + +val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, + 0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build() +metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) + +// 4. Send TopicMetadataReq using topicId +val capturedMetadataByTopicIdResp = expectNoThrottling() +EasyMock.replay(clientRequestQuotaManager, requestChannel, authorizer) + +val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build() +createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(buildRequest(metadataReqByTopicId, plaintextListener)) +val metadataByTopicIdResp = readResponse(metadataReqByTopicId, capturedMetadataByTopicIdResp).asInstanceOf[MetadataResponse] + +val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).view.mapValues(_.head).toMap Review comment: Thank you, Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566550049 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { +if (!hasPersistentStores) { +return UUID.randomUUID(); +} + +if (!lockStateDirectory()) { +log.error("Unable to obtain lock as state directory is already locked by another process"); +throw new StreamsException("Unable to initialize state, this can happen if multiple instances of " + + "Kafka Streams are running in the same state directory"); +} + +final File processFile = new File(stateDir, PROCESS_FILE_NAME); +try { +if (processFile.exists()) { +try (final BufferedReader reader = Files.newBufferedReader(processFile.toPath())) { +// only field in version 0 is the UUID +final int version = Integer.parseInt(reader.readLine()); +if (version > 0) { Review comment: Oh yeah definitely, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566549210 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { +log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); return true; } -if (state == State.PENDING_ERROR) { -log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); -if (waitOnState(State.ERROR, timeoutMs)) { +if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) { +log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); +if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { log.info("Streams client stopped to ERROR completely"); return true; +} else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { +log.info("Streams client stopped to NOT_RUNNING completely"); +return true; } else { -log.info("Streams client cannot transition to ERROR completely within the timeout"); +log.warn("Streams client cannot transition to {}} completely within the timeout", state); Review comment: Ah good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
ijuma commented on a change in pull request #9589: URL: https://github.com/apache/kafka/pull/9589#discussion_r566540886 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java ## @@ -256,6 +257,80 @@ public void testWorkerConfigs() { "secret2", bProps.get("producer.ssl.key.password")); } +@Test +public void testClusterPairsWithDefaultSettings() { +MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps( +"clusters", "a, b, c")); +// implicit configuration associated +// a->b.enabled=false +// a->b.emit.heartbeat.enabled=true +// a->c.enabled=false +// a->c.emit.heartbeat.enabled=true +// b->a.enabled=false +// b->a.emit.heartbeat.enabled=true +// b->c.enabled=false +// b->c.emit.heartbeat.enabled=true +// c->a.enabled=false +// c->a.emit.heartbeat.enabled=true +// c->b.enabled=false +// c->b.emit.heartbeat.enabled=true +List clusterPairs = mirrorConfig.clusterPairs(); +assertEquals("clusterPairs count should match all combinations count", Review comment: I fixed the build via https://github.com/apache/kafka/commit/bf4afae8f53471ab6403cbbfcd2c4e427bdd4568 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil opened a new pull request #9996: KAFKA-12249: Add client-side Decommission Broker RPC
aloknnikhil opened a new pull request #9996: URL: https://github.com/apache/kafka/pull/9996 ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
aloknnikhil commented on a change in pull request #9589: URL: https://github.com/apache/kafka/pull/9589#discussion_r566536740 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java ## @@ -256,6 +257,80 @@ public void testWorkerConfigs() { "secret2", bProps.get("producer.ssl.key.password")); } +@Test +public void testClusterPairsWithDefaultSettings() { +MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps( +"clusters", "a, b, c")); +// implicit configuration associated +// a->b.enabled=false +// a->b.emit.heartbeat.enabled=true +// a->c.enabled=false +// a->c.emit.heartbeat.enabled=true +// b->a.enabled=false +// b->a.emit.heartbeat.enabled=true +// b->c.enabled=false +// b->c.emit.heartbeat.enabled=true +// c->a.enabled=false +// c->a.emit.heartbeat.enabled=true +// c->b.enabled=false +// c->b.emit.heartbeat.enabled=true +List clusterPairs = mirrorConfig.clusterPairs(); +assertEquals("clusterPairs count should match all combinations count", Review comment: @twobeeb Could you migrate these to use the new JUnit 5 `assertEquals` API? Looks like the build actually failed here - https://github.com/apache/kafka/runs/1782294502 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
junrao commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566532815 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private BaseH
[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees
[ https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274114#comment-17274114 ] Kyle Ambroff-Kao commented on KAFKA-10853: -- I've been discussing this with my team at LinkedIn and so far we don't see a better alternative than the largestAckedOffset + require acks=all in a topic level config. Your suggestions have been really helpful but I think our original approach still sounds better. We might just prototype and test this out in our Kafka fork. > Replication protocol deficiencies with workloads requiring high durability > guarantees > - > > Key: KAFKA-10853 > URL: https://issues.apache.org/jira/browse/KAFKA-10853 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Kyle Ambroff-Kao >Priority: Major > > *tl;dr: The definition of ISR and the consistency model from the perspective > of the producer seem a bit out of sync* > We have many systems in production that trade off availability in order to > provide stronger consistency guarantees. Most of these configurations look > like this: > Topic configuration: > * replication factor 3 > * min.insync.replicas=2 > * unclean.leader.election.enable=false > Producer configuration: > * acks=all > Broker configuration: > * replica.lag.time.max.ms=1 > So the goal here is to reduce the chance of ever dropping a message that the > leader has acknowledged to the producer. > This works great, except that we've found some situations in production where > we are forced to enable unclean leader election to recover, which we never > want to do. These situations all seem totally avoidable with some small > tweaks to the replication protocol. > *A scenario we've seen many times* > The following sequence of events are in time order: A replica set for a > topic-partition TP with leader L and replicas R1 and R2. All three replicas > are in ISR. > # Producer sends ProduceRequest R with acks=all that contains a message > batch to the leader L. > # L receives R and appends the batch it contains to the active segment of TP > but does not ack to the producer yet because the request was acks=all > # A storage fault occurs on L which makes all IOPS take a long time but > doesn't cause a hard failure. > # R1 and R2 send follower fetch requests to L which are infinitely delayed > due to the storage fault on L. > # 10 seconds after appending the batch and appending it to the log, L > shrinks the ISR, removing R1 and R2. This is because ISR is defined as at > most replica.lag.time.max.ms milliseconds behind the log append time of the > leader end offset. The leader end offset is a message that has not been > replicated yet. > The storage fault example in step 3 could easily be another kind of fault. > Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or > the producer. > The producer never receives acknowledgement of the ProduceRequest because the > min.insync.replicas constraint was never satisfied. So in terms of data > consistency, everything is working fine. > The problem is recovering from this situation. If the fault on L is not a > temporary blip, then L needs to be replaced. But since L shrunk the ISR, the > only way that leadership can move to either R1 or R2 is to set > unclean.leader.election.enable=true. > This works but it is a potentially unsafe way to recover and move leadership. > It would be better to have other options. > *Recovery could be automatic in this scenario.* > If you think about it, from the perspective of the producer, the write was > not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it > should actually be totally safe for leadership to transition to either R1 or > R2. > It seems that the producer and the leader don't have fully compatible > definitions for what it means for the replica set to be in-sync. If the > leader L used different rules for defining ISR, it could allow self-healing > in this or similar scenarios, since the ISR would not shrink. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
junrao commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566532467 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private BaseH
[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
junrao commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566532372 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.AbstractCollection; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash map which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null keys and values are not supported. + * + * @paramThe key type of the set. + * @paramThe value type of the set. + */ +public class TimelineHashMap +extends SnapshottableHashTable> +implements Map { +static class TimelineHashMapEntry +implements SnapshottableHashTable.ElementWithStartEpoch, Map.Entry { +private final K key; +private final V value; +private long startEpoch; + +TimelineHashMapEntry(K key, V value) { +this.key = key; +this.value = value; +this.startEpoch = Long.MAX_VALUE; +} + +@Override +public K getKey() { +return key; +} + +@Override +public V getValue() { +return value; +} + +@Override +public V setValue(V value) { +// This would be inefficient to support since we'd need a back-reference +// to the enclosing map in each Entry object. There would also be +// complications if this entry object was sourced from a historical iterator; +// we don't support modifying the past. Since we don't really need this API, +// let's just not support it. +throw new UnsupportedOperationException(); +} + +@Override +public void setStartEpoch(long startEpoch) { +this.startEpoch = startEpoch; +} + +@Override +public long startEpoch() { +return startEpoch; +} + +@SuppressWarnings("unchecked") +@Override +public boolean equals(Object o) { +if (!(o instanceof TimelineHashMapEntry)) return false; +TimelineHashMapEntry other = (TimelineHashMapEntry) o; +return key.equals(other.key); +} + +@Override +public int hashCode() { +return key.hashCode(); +} +} + +public TimelineHashMap(SnapshotRegistry snapshotRegistry, int expectedSize) { +super(snapshotRegistry, expectedSize); +} + +@Override +public int size() { +return size(Long.MAX_VALUE); +} + +public int size(long epoch) { +return snapshottableSize(epoch); +} + +@Override +public boolean isEmpty() { +return isEmpty(Long.MAX_VALUE); +} + +public boolean isEmpty(long epoch) { +return snapshottableSize(epoch) == 0; +} + +@Override +public boolean containsKey(Object key) { +return containsKey(key, Long.MAX_VALUE); +} + +public boolean containsKey(Object key, long epoch) { +return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null; +} + +@Override +public boolean containsValue(Object value) { +Iterator> iter = entrySet().iterator(); +while (iter.hasNext()) { +Entry e = iter.next(); +if (value.equals(e.getValue())) { +return true; +} +} +return false; +} + +@Override +public V get(Object key) { +return get(key, Long.MAX_VALUE); +} + +public V get(Object key, long epoch) { +Entry entry = +snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch); +if (entry == null) { +return null; +} +return entry.getValue(); +} + +@Override +public V put(K key, V value) { +Objects.requireNonNull(key); +
[jira] [Created] (KAFKA-12249) KIP-500: Add client-side Decommission Broker RPC
Alok Nikhil created KAFKA-12249: --- Summary: KIP-500: Add client-side Decommission Broker RPC Key: KAFKA-12249 URL: https://issues.apache.org/jira/browse/KAFKA-12249 Project: Kafka Issue Type: Task Components: clients, core Affects Versions: 2.8.0 Reporter: Alok Nikhil Assignee: Alok Nikhil -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] aloknnikhil commented on pull request #9994: KAFKA-12248: Add BrokerHeartbeat/BrokerRegistration RPCs for KIP-500
aloknnikhil commented on pull request #9994: URL: https://github.com/apache/kafka/pull/9994#issuecomment-769516430 @hachikuji Fixed it up to merge only the Broker-Controller RPCs (Heartbeat/Registration) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12248) KIP-500: Add broker heartbeat and registration RPCs
Alok Nikhil created KAFKA-12248: --- Summary: KIP-500: Add broker heartbeat and registration RPCs Key: KAFKA-12248 URL: https://issues.apache.org/jira/browse/KAFKA-12248 Project: Kafka Issue Type: Task Components: core Affects Versions: 2.8.0 Reporter: Alok Nikhil Assignee: Alok Nikhil For KIP-500, we need to support broker heartbeats and registration with the Quorum Controller (as described in KIP-631). This task tracks the addition of the RPC message types to support these. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
[ https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274089#comment-17274089 ] Jot Zhao edited comment on KAFKA-7263 at 1/29/21, 1:32 AM: --- [~soodvarun25] this issues has fixed by issue kafka-8104 and PR is [here|https://github.com/apache/kafka/pull/7460] was (Author: jot.zhao): [~soodvarun25] this issues has fixed by issue kafka-8104 and PR is [this|https://github.com/apache/kafka/pull/7460] > Container exception java.lang.IllegalStateException: Coordinator selected > invalid assignment protocol: null > --- > > Key: KAFKA-7263 > URL: https://issues.apache.org/jira/browse/KAFKA-7263 > Project: Kafka > Issue Type: Bug >Reporter: laomei >Priority: Major > Fix For: 2.4.0 > > > We are using spring-kafka and we get an infinite loop error in > ConsumerCoordinator.java; > kafka cluster version: 1.0.0 > kafka-client version: 1.0.0 > > 2018-08-08 15:24:46,120 ERROR > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Container exception > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:745) > 2018-08-08 15:24:46,132 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,230 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,234 INFO [org.springfram -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
[ https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274089#comment-17274089 ] Jot Zhao edited comment on KAFKA-7263 at 1/29/21, 1:32 AM: --- [~soodvarun25] this issues has fixed by issue kafka-8104 and PR is [this|https://github.com/apache/kafka/pull/7460] was (Author: jot.zhao): [~soodvarun25] this issues has fixed by issue [kafka-8104|https://issues.apache.org/jira/browse/KAFKA-8104] and PR is [ this|https://github.com/apache/kafka/pull/7460] > Container exception java.lang.IllegalStateException: Coordinator selected > invalid assignment protocol: null > --- > > Key: KAFKA-7263 > URL: https://issues.apache.org/jira/browse/KAFKA-7263 > Project: Kafka > Issue Type: Bug >Reporter: laomei >Priority: Major > Fix For: 2.4.0 > > > We are using spring-kafka and we get an infinite loop error in > ConsumerCoordinator.java; > kafka cluster version: 1.0.0 > kafka-client version: 1.0.0 > > 2018-08-08 15:24:46,120 ERROR > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Container exception > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:745) > 2018-08-08 15:24:46,132 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,230 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,234 INFO [org.springfram -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
[ https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274089#comment-17274089 ] Jot Zhao commented on KAFKA-7263: - [~soodvarun25] this issues has fixed by issue [kafka-8104|https://issues.apache.org/jira/browse/KAFKA-8104] and PR is [ this|https://github.com/apache/kafka/pull/7460] > Container exception java.lang.IllegalStateException: Coordinator selected > invalid assignment protocol: null > --- > > Key: KAFKA-7263 > URL: https://issues.apache.org/jira/browse/KAFKA-7263 > Project: Kafka > Issue Type: Bug >Reporter: laomei >Priority: Major > Fix For: 2.4.0 > > > We are using spring-kafka and we get an infinite loop error in > ConsumerCoordinator.java; > kafka cluster version: 1.0.0 > kafka-client version: 1.0.0 > > 2018-08-08 15:24:46,120 ERROR > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Container exception > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:745) > 2018-08-08 15:24:46,132 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,230 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,234 INFO [org.springfram -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
hachikuji merged pull request #9985: URL: https://github.com/apache/kafka/pull/9985 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sudoa commented on pull request #9995: Add log about broker info when getting UNKNOWN_TOPIC_OR_PARTITION error
sudoa commented on pull request #9995: URL: https://github.com/apache/kafka/pull/9995#issuecomment-769495037 done by mistake This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sudoa closed pull request #9995: Add log about broker info when getting UNKNOWN_TOPIC_OR_PARTITION error
sudoa closed pull request #9995: URL: https://github.com/apache/kafka/pull/9995 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sudoa opened a new pull request #9995: Add log about broker info when getting UNKNOWN_TOPIC_OR_PARTITION error
sudoa opened a new pull request #9995: URL: https://github.com/apache/kafka/pull/9995 Description: This patch adds additional logs in the `NetworkClient` to print metadata info such as leader id and epoch for partitions that fail metadata fetch with UNKNOWN_TOPIC_OR_PARTITION error. This will help with a Venice investigation: https://jira01.corp.linkedin.com:8443/browse/LIKAFKA-33540 Testing: N/A, since it's for logs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9994: MINOR: Add BrokerHeartbeat/BrokerRegistration/DecommissionBroker RPCs for KIP-500
hachikuji commented on pull request #9994: URL: https://github.com/apache/kafka/pull/9994#issuecomment-769479261 Thanks for the PR. Just to make this a little more manageable, could we split this into two PRs? One for the register/heartbeat controller APIs? And a separate one for decommission, which is a client API. Also, it would be nice to have JIRAs since these are significant additions. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil opened a new pull request #9994: MINOR: Add BrokerHeartbeat/BrokerRegistration/DecommissionBroker RPCs for KIP-500
aloknnikhil opened a new pull request #9994: URL: https://github.com/apache/kafka/pull/9994 ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r566487288 ## File path: core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala ## @@ -15,12 +15,13 @@ * limitations under the License. */ -package kafka.server +package unit.kafka.server Review comment: Got it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9984: KAFKA-12247: add timeout and static group rebalance to remove thread
ableegoldman commented on pull request #9984: URL: https://github.com/apache/kafka/pull/9984#issuecomment-769472481 FYI ``` 15:42:08 Execution failed for task ':streams:checkstyleMain'. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566481305 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashSet.java ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash set which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null values are not supported. + * + * @paramThe value type of the set. + */ +public class TimelineHashSet +extends SnapshottableHashTable> +implements Set { +static class TimelineHashSetEntry +implements SnapshottableHashTable.ElementWithStartEpoch { +private final T value; +private long startEpoch; + +TimelineHashSetEntry(T value) { +this.value = value; +this.startEpoch = Long.MAX_VALUE; +} + +public T getValue() { +return value; +} + +@Override +public void setStartEpoch(long startEpoch) { +this.startEpoch = startEpoch; +} + +@Override +public long startEpoch() { +return startEpoch; +} + +@SuppressWarnings("unchecked") +@Override +public boolean equals(Object o) { +if (!(o instanceof TimelineHashSetEntry)) return false; +TimelineHashSetEntry other = (TimelineHashSetEntry) o; +return value.equals(other.value); +} + +@Override +public int hashCode() { +return value.hashCode(); +} +} + +public TimelineHashSet(SnapshotRegistry snapshotRegistry, int expectedSize) { +super(snapshotRegistry, expectedSize); +} + +@Override +public int size() { +return size(Long.MAX_VALUE); +} + +public int size(long epoch) { +return snapshottableSize(epoch); +} + +@Override +public boolean isEmpty() { +return isEmpty(Long.MAX_VALUE); +} + +public boolean isEmpty(long epoch) { +return snapshottableSize(epoch) == 0; +} + +@Override +public boolean contains(Object key) { +return contains(key, Long.MAX_VALUE); +} + +public boolean contains(Object object, long epoch) { +return snapshottableGet(new TimelineHashSetEntry<>(object), epoch) != null; +} + +final class ValueIterator implements Iterator { +private final Iterator> iter; + +ValueIterator(long epoch) { +this.iter = snapshottableIterator(epoch); +} + +@Override +public boolean hasNext() { +return iter.hasNext(); +} + +@Override +public T next() { +return iter.next().value; +} + +@Override +public void remove() { +iter.remove(); +} +} + +@Override +public Iterator iterator() { +return iterator(Long.MAX_VALUE); +} + +public Iterator iterator(long epoch) { +return new ValueIterator(epoch); +} + +@Override +public Object[] toArray() { +Object[] result = new Object[size()]; +Iterator iter = iterator(); +int i = 0; +while (iter.hasNext()) { +result[i++] = iter.next(); +} +return result; +} + +@SuppressWarnings("unchecked") +@Override +public R[] toArray(R[] a) { +int size = size(); +if (size <= a.length) { +Iterator iter = iterator(); +int i = 0; +while (iter.hasNext()) { +a[i++] = (R) iter.next(); +} +while (i < a.length) { +a[i++] = null; +} +return a; +} else { +return (R[]) toArray(); +} +} + +@Override +public boolean add(T newValue) { +Objects.requireNonNull(newValue); +return snapshottableAddUnlessPresent(new TimelineHa
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566481190 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.AbstractCollection; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash map which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null keys and values are not supported. + * + * @paramThe key type of the set. + * @paramThe value type of the set. + */ +public class TimelineHashMap +extends SnapshottableHashTable> +implements Map { +static class TimelineHashMapEntry +implements SnapshottableHashTable.ElementWithStartEpoch, Map.Entry { +private final K key; +private final V value; +private long startEpoch; + +TimelineHashMapEntry(K key, V value) { +this.key = key; +this.value = value; +this.startEpoch = Long.MAX_VALUE; +} + +@Override +public K getKey() { +return key; +} + +@Override +public V getValue() { +return value; +} + +@Override +public V setValue(V value) { +// This would be inefficient to support since we'd need a back-reference +// to the enclosing map in each Entry object. There would also be +// complications if this entry object was sourced from a historical iterator; +// we don't support modifying the past. Since we don't really need this API, +// let's just not support it. +throw new UnsupportedOperationException(); +} + +@Override +public void setStartEpoch(long startEpoch) { +this.startEpoch = startEpoch; +} + +@Override +public long startEpoch() { +return startEpoch; +} + +@SuppressWarnings("unchecked") +@Override +public boolean equals(Object o) { +if (!(o instanceof TimelineHashMapEntry)) return false; +TimelineHashMapEntry other = (TimelineHashMapEntry) o; +return key.equals(other.key); +} + +@Override +public int hashCode() { +return key.hashCode(); +} +} + +public TimelineHashMap(SnapshotRegistry snapshotRegistry, int expectedSize) { +super(snapshotRegistry, expectedSize); +} + +@Override +public int size() { +return size(Long.MAX_VALUE); +} + +public int size(long epoch) { +return snapshottableSize(epoch); +} + +@Override +public boolean isEmpty() { +return isEmpty(Long.MAX_VALUE); +} + +public boolean isEmpty(long epoch) { +return snapshottableSize(epoch) == 0; +} + +@Override +public boolean containsKey(Object key) { +return containsKey(key, Long.MAX_VALUE); +} + +public boolean containsKey(Object key, long epoch) { +return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null; +} + +@Override +public boolean containsValue(Object value) { +Iterator> iter = entrySet().iterator(); +while (iter.hasNext()) { +Entry e = iter.next(); +if (value.equals(e.getValue())) { +return true; +} +} +return false; +} + +@Override +public V get(Object key) { +return get(key, Long.MAX_VALUE); +} + +public V get(Object key, long epoch) { +Entry entry = +snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch); +if (entry == null) { +return null; +} +return entry.getValue(); +} + +@Override +public V put(K key, V value) { +Objects.requireNonNull(key); +
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566481017 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private Base
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: KAFKA-12247: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r566478398 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -69,6 +73,7 @@ import org.apache.kafka.streams.state.internals.QueryableStoreProvider; import org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter; import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; +import org.apache.kafka.common.errors.TimeoutException; Review comment: nit: put this import above with the others (IDE often misplaces these since we follow a weird import ordering in places) ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,31 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldRemoveStreamThreadWithStaticMembership() throws Exception { +properties.put("group.instance.id", "test"); Review comment: ```suggestion properties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "member-A"); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566469662 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.AbstractCollection; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash map which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null keys and values are not supported. + * + * @paramThe key type of the set. + * @paramThe value type of the set. + */ +public class TimelineHashMap +extends SnapshottableHashTable> +implements Map { +static class TimelineHashMapEntry +implements SnapshottableHashTable.ElementWithStartEpoch, Map.Entry { +private final K key; +private final V value; +private long startEpoch; + +TimelineHashMapEntry(K key, V value) { +this.key = key; +this.value = value; +this.startEpoch = Long.MAX_VALUE; +} + +@Override +public K getKey() { +return key; +} + +@Override +public V getValue() { +return value; +} + +@Override +public V setValue(V value) { +// This would be inefficient to support since we'd need a back-reference +// to the enclosing map in each Entry object. There would also be +// complications if this entry object was sourced from a historical iterator; +// we don't support modifying the past. Since we don't really need this API, +// let's just not support it. +throw new UnsupportedOperationException(); +} + +@Override +public void setStartEpoch(long startEpoch) { +this.startEpoch = startEpoch; +} + +@Override +public long startEpoch() { +return startEpoch; +} + +@SuppressWarnings("unchecked") +@Override +public boolean equals(Object o) { +if (!(o instanceof TimelineHashMapEntry)) return false; +TimelineHashMapEntry other = (TimelineHashMapEntry) o; +return key.equals(other.key); +} + +@Override +public int hashCode() { +return key.hashCode(); +} +} + +public TimelineHashMap(SnapshotRegistry snapshotRegistry, int expectedSize) { +super(snapshotRegistry, expectedSize); +} + +@Override +public int size() { +return size(Long.MAX_VALUE); +} + +public int size(long epoch) { +return snapshottableSize(epoch); +} + +@Override +public boolean isEmpty() { +return isEmpty(Long.MAX_VALUE); +} + +public boolean isEmpty(long epoch) { +return snapshottableSize(epoch) == 0; +} + +@Override +public boolean containsKey(Object key) { +return containsKey(key, Long.MAX_VALUE); +} + +public boolean containsKey(Object key, long epoch) { +return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null; +} + +@Override +public boolean containsValue(Object value) { +Iterator> iter = entrySet().iterator(); +while (iter.hasNext()) { +Entry e = iter.next(); +if (value.equals(e.getValue())) { +return true; +} +} +return false; +} + +@Override +public V get(Object key) { +return get(key, Long.MAX_VALUE); +} + +public V get(Object key, long epoch) { +Entry entry = +snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch); +if (entry == null) { +return null; +} +return entry.getValue(); +} + +@Override +public V put(K key, V value) { +Objects.requireNonNull(key); +
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566469311 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java ## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.AbstractCollection; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This is a hash map which can be snapshotted. + * + * See {@SnapshottableHashTable} for more details about the implementation. + * + * This class requires external synchronization. Null keys and values are not supported. + * + * @paramThe key type of the set. + * @paramThe value type of the set. + */ +public class TimelineHashMap +extends SnapshottableHashTable> +implements Map { +static class TimelineHashMapEntry +implements SnapshottableHashTable.ElementWithStartEpoch, Map.Entry { +private final K key; +private final V value; +private long startEpoch; + +TimelineHashMapEntry(K key, V value) { +this.key = key; +this.value = value; +this.startEpoch = Long.MAX_VALUE; +} + +@Override +public K getKey() { +return key; +} + +@Override +public V getValue() { +return value; +} + +@Override +public V setValue(V value) { +// This would be inefficient to support since we'd need a back-reference +// to the enclosing map in each Entry object. There would also be +// complications if this entry object was sourced from a historical iterator; +// we don't support modifying the past. Since we don't really need this API, +// let's just not support it. +throw new UnsupportedOperationException(); +} + +@Override +public void setStartEpoch(long startEpoch) { +this.startEpoch = startEpoch; +} + +@Override +public long startEpoch() { +return startEpoch; +} + +@SuppressWarnings("unchecked") +@Override +public boolean equals(Object o) { +if (!(o instanceof TimelineHashMapEntry)) return false; +TimelineHashMapEntry other = (TimelineHashMapEntry) o; +return key.equals(other.key); +} + +@Override +public int hashCode() { +return key.hashCode(); +} +} + +public TimelineHashMap(SnapshotRegistry snapshotRegistry, int expectedSize) { +super(snapshotRegistry, expectedSize); +} + +@Override +public int size() { +return size(Long.MAX_VALUE); +} + +public int size(long epoch) { +return snapshottableSize(epoch); +} + +@Override +public boolean isEmpty() { +return isEmpty(Long.MAX_VALUE); +} + +public boolean isEmpty(long epoch) { +return snapshottableSize(epoch) == 0; +} + +@Override +public boolean containsKey(Object key) { +return containsKey(key, Long.MAX_VALUE); +} + +public boolean containsKey(Object key, long epoch) { +return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null; +} + +@Override +public boolean containsValue(Object value) { +Iterator> iter = entrySet().iterator(); +while (iter.hasNext()) { +Entry e = iter.next(); +if (value.equals(e.getValue())) { +return true; +} +} +return false; +} + +@Override +public V get(Object key) { +return get(key, Long.MAX_VALUE); +} + +public V get(Object key, long epoch) { +Entry entry = +snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch); +if (entry == null) { +return null; +} +return entry.getValue(); +} + +@Override +public V put(K key, V value) { +Objects.requireNonNull(key); +
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566468092 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.stream.Collectors; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +/** + * A registry containing snapshots of timeline data structures. + * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time. + * Therefore, we use ArrayLists here rather than a data structure with higher overhead. + */ +public class SnapshotRegistry { +private final Logger log; + +/** + * The current epoch. All snapshot epochs are lower than this number. + */ +private long curEpoch; + +/** + * An ArrayList of snapshots, kept in sorted order. + */ +private final ArrayList snapshots; + +public SnapshotRegistry(long startEpoch) { +this(new LogContext(), startEpoch); +} + +public SnapshotRegistry(LogContext logContext, long startEpoch) { +this.log = logContext.logger(SnapshotRegistry.class); +this.curEpoch = startEpoch; +this.snapshots = new ArrayList<>(5); +} + +/** + * Returns an iterator that moves through snapshots from the lowest to the highest epoch. + */ +public Iterator snapshots() { +return snapshots.iterator(); +} + +/** + * Gets the snapshot for a specific epoch. + */ +public Snapshot get(long epoch) { +for (Snapshot snapshot : snapshots) { +if (snapshot.epoch() == epoch) { +return snapshot; +} +} +throw new RuntimeException("No snapshot for epoch " + epoch); +} + +/** + * Creates a new snapshot at the given epoch. + * + * @param epoch The epoch to create the snapshot at. The current epoch + * will be advanced to one past this epoch. + */ +public Snapshot createSnapshot(long epoch) { +if (epoch < curEpoch) { +throw new RuntimeException("Can't create a new snapshot at epoch " + epoch + +" because the current epoch is " + curEpoch); +} +Snapshot snapshot = new Snapshot(epoch); +snapshots.add(snapshot); +curEpoch = epoch + 1; +log.debug("Creating snapshot {}", epoch); +return snapshot; +} + +/** + * Deletes the snapshot with the given epoch. + * + * @param epoch The epoch of the snapshot to delete. + */ +public void deleteSnapshot(long epoch) { +Iterator iter = snapshots.iterator(); +while (iter.hasNext()) { +Snapshot snapshot = iter.next(); +if (snapshot.epoch() == epoch) { +log.debug("Deleting snapshot {}", epoch); +iter.remove(); +return; +} +} +throw new RuntimeException(String.format( +"No snapshot at epoch %d found. Snapshot epochs are %s.", epoch, +snapshots.stream().map(snapshot -> String.valueOf(snapshot.epoch())). +collect(Collectors.joining(", "; +} + +/** + * Reverts the state of all data structures to the state at the given epoch. + * + * @param epoch The epoch of the snapshot to revert to. + */ +public void revertToSnapshot(long epoch) { Review comment: It does do that. I'll add a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566468092 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.stream.Collectors; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +/** + * A registry containing snapshots of timeline data structures. + * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time. + * Therefore, we use ArrayLists here rather than a data structure with higher overhead. + */ +public class SnapshotRegistry { +private final Logger log; + +/** + * The current epoch. All snapshot epochs are lower than this number. + */ +private long curEpoch; + +/** + * An ArrayList of snapshots, kept in sorted order. + */ +private final ArrayList snapshots; + +public SnapshotRegistry(long startEpoch) { +this(new LogContext(), startEpoch); +} + +public SnapshotRegistry(LogContext logContext, long startEpoch) { +this.log = logContext.logger(SnapshotRegistry.class); +this.curEpoch = startEpoch; +this.snapshots = new ArrayList<>(5); +} + +/** + * Returns an iterator that moves through snapshots from the lowest to the highest epoch. + */ +public Iterator snapshots() { +return snapshots.iterator(); +} + +/** + * Gets the snapshot for a specific epoch. + */ +public Snapshot get(long epoch) { +for (Snapshot snapshot : snapshots) { +if (snapshot.epoch() == epoch) { +return snapshot; +} +} +throw new RuntimeException("No snapshot for epoch " + epoch); +} + +/** + * Creates a new snapshot at the given epoch. + * + * @param epoch The epoch to create the snapshot at. The current epoch + * will be advanced to one past this epoch. + */ +public Snapshot createSnapshot(long epoch) { +if (epoch < curEpoch) { +throw new RuntimeException("Can't create a new snapshot at epoch " + epoch + +" because the current epoch is " + curEpoch); +} +Snapshot snapshot = new Snapshot(epoch); +snapshots.add(snapshot); +curEpoch = epoch + 1; +log.debug("Creating snapshot {}", epoch); +return snapshot; +} + +/** + * Deletes the snapshot with the given epoch. + * + * @param epoch The epoch of the snapshot to delete. + */ +public void deleteSnapshot(long epoch) { +Iterator iter = snapshots.iterator(); +while (iter.hasNext()) { +Snapshot snapshot = iter.next(); +if (snapshot.epoch() == epoch) { +log.debug("Deleting snapshot {}", epoch); +iter.remove(); +return; +} +} +throw new RuntimeException(String.format( +"No snapshot at epoch %d found. Snapshot epochs are %s.", epoch, +snapshots.stream().map(snapshot -> String.valueOf(snapshot.epoch())). +collect(Collectors.joining(", "; +} + +/** + * Reverts the state of all data structures to the state at the given epoch. + * + * @param epoch The epoch of the snapshot to revert to. + */ +public void revertToSnapshot(long epoch) { Review comment: It does do that. I'll add a line to the JavaDoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566467584 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private Base
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r566463935 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest( FetchRequestData.FetchPartition request, long currentTimeMs ) { -Optional errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); -if (errorOpt.isPresent()) { -return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); -} +try { +Optional errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); +if (errorOpt.isPresent()) { +return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); +} -long fetchOffset = request.fetchOffset(); -int lastFetchedEpoch = request.lastFetchedEpoch(); -LeaderState state = quorum.leaderStateOrThrow(); -Optional divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - -if (divergingEpochOpt.isPresent()) { -Optional divergingEpoch = -divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() -.setEpoch(offsetAndEpoch.epoch) -.setEndOffset(offsetAndEpoch.offset)); -return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); -} else { -LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); +long fetchOffset = request.fetchOffset(); +int lastFetchedEpoch = request.lastFetchedEpoch(); +LeaderState state = quorum.leaderStateOrThrow(); +ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + +final Records records; +if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { +LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); -if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { -onUpdateLeaderHighWatermark(state, currentTimeMs); +if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { +onUpdateLeaderHighWatermark(state, currentTimeMs); +} + +records = info.records; +} else { +records = MemoryRecords.EMPTY; } -return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); +return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); +} catch (Exception e) { +logger.error("Caught unexpected error in fetch completion of request {}", request, e); +return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty()); } } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ -private Optional validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { -if (fetchOffset == 0 && lastFetchedEpoch == 0) { -return Optional.empty(); +private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { +if (log.startOffset() == 0 && fetchOffset == 0) { +if (lastFetchedEpoch != 0) { +logger.warn( +"Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", +fetchOffset, +lastFetchedEpoch +); +} +return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r566463607 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -147,18 +223,102 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { -FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) +// Do not let the state machine create snapshots older than the latest snapshot +latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { +// Since snapshots are less than the high-watermark absolute offset comparison is okay. +throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" +) + } +} + +FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { +Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { +Optional.empty() + } +} catch { + case _: NoSuchFileException => +Optional.empty() +} + } + + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { +try { + Optional.of(snapshotIds.last) } catch { - case e: NoSuchFileException => Optional.empty() + case _: NoSuchElementException => +Optional.empty() +} + } + + override def oldestSnapshotId(): Optional[raft.OffsetAndEpoch] = { +oldestSnapshotId + } + + override def onSnapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = { +snapshotIds.add(snapshotId) + } + + override def deleteToNewOldestSnapshotId(logStartSnapshotId: raft.OffsetAndEpoch): Boolean = { Review comment: Picked `deleteBeforeSnapshot`. ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -113,6 +160,22 @@ class KafkaMetadataLog( log.truncateTo(offset) } + override def maybeTruncateFullyToLatestSnapshot(): Boolean = { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
hachikuji merged pull request #9589: URL: https://github.com/apache/kafka/pull/9589 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r566460558 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1910,7 +1931,7 @@ class Log(@volatile private var _dir: File, in the header. */ appendInfo.firstOffset match { -case Some(firstOffset) => roll(Some(firstOffset)) +case Some(firstOffset) => roll(Some(firstOffset.messageOffset)) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566457016 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private Base
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566454521 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private Base
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566452138 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private Base
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566451017 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.stream.Collectors; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +/** + * A registry containing snapshots of timeline data structures. + * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time. + * Therefore, we use ArrayLists here rather than a data structure with higher overhead. + */ +public class SnapshotRegistry { +private final Logger log; + +/** + * The current epoch. All snapshot epochs are lower than this number. + */ +private long curEpoch; + +/** + * An ArrayList of snapshots, kept in sorted order. + */ +private final ArrayList snapshots; + +public SnapshotRegistry(long startEpoch) { +this(new LogContext(), startEpoch); +} + +public SnapshotRegistry(LogContext logContext, long startEpoch) { +this.log = logContext.logger(SnapshotRegistry.class); +this.curEpoch = startEpoch; +this.snapshots = new ArrayList<>(5); +} + +/** + * Returns an iterator that moves through snapshots from the lowest to the highest epoch. + */ +public Iterator snapshots() { +return snapshots.iterator(); +} + +/** + * Gets the snapshot for a specific epoch. + */ +public Snapshot get(long epoch) { +for (Snapshot snapshot : snapshots) { +if (snapshot.epoch() == epoch) { +return snapshot; +} +} +throw new RuntimeException("No snapshot for epoch " + epoch); +} + +/** + * Creates a new snapshot at the given epoch. + * + * @param epoch The epoch to create the snapshot at. The current epoch + * will be advanced to one past this epoch. + */ +public Snapshot createSnapshot(long epoch) { Review comment: We want the epoch to be equal to the offset in the metadata log. So, it needs to be provided externally. (For example, a snapshot at epoch 123 would have the state of the metadata data structures after replaying all the metadata records from 0 to 123.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
cmccabe commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r566450389 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte + * current state. + * + * One very important feature of SnapshottableHashTable is that we support iterating + * over a snapshot even while changes are being made to the current state. See the + * Javadoc for the iterator for more information about how this is accomplished. + * + * All of these classes require external synchronization, and don't support null keys or + * values. + */ +class SnapshottableHashTable +extends BaseHashTable implements Revertable { +interface ElementWithStartEpoch { +void setStartEpoch(long startEpoch); +long startEpoch(); +} + +static class HashTier { +private final int size; +private Base
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
rajinisivaram commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r566442295 ## File path: core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala ## @@ -15,12 +15,13 @@ * limitations under the License. */ -package kafka.server +package unit.kafka.server Review comment: We use `kafka.server` for the other unit tests in this directory, so we should do the same here (in Scala, the package name doesn't need to match the directory structure). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r566430269 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest( FetchRequestData.FetchPartition request, long currentTimeMs ) { -Optional errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); -if (errorOpt.isPresent()) { -return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); -} +try { +Optional errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); +if (errorOpt.isPresent()) { +return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); +} -long fetchOffset = request.fetchOffset(); -int lastFetchedEpoch = request.lastFetchedEpoch(); -LeaderState state = quorum.leaderStateOrThrow(); -Optional divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - -if (divergingEpochOpt.isPresent()) { -Optional divergingEpoch = -divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() -.setEpoch(offsetAndEpoch.epoch) -.setEndOffset(offsetAndEpoch.offset)); -return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); -} else { -LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); +long fetchOffset = request.fetchOffset(); +int lastFetchedEpoch = request.lastFetchedEpoch(); +LeaderState state = quorum.leaderStateOrThrow(); +ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + +final Records records; +if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { +LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); -if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { -onUpdateLeaderHighWatermark(state, currentTimeMs); +if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { +onUpdateLeaderHighWatermark(state, currentTimeMs); +} + +records = info.records; +} else { +records = MemoryRecords.EMPTY; } -return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); +return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); +} catch (Exception e) { +logger.error("Caught unexpected error in fetch completion of request {}", request, e); +return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty()); } } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ -private Optional validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { -if (fetchOffset == 0 && lastFetchedEpoch == 0) { -return Optional.empty(); +private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { +if (log.startOffset() == 0 && fetchOffset == 0) { +if (lastFetchedEpoch != 0) { +logger.warn( +"Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", +fetchOffset, +lastFetchedEpoch +); +} +return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); } -OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch) -.orElse(new OffsetAndEpoch(-1L, -1)); -if (endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset) { -return Optional.of(endOffsetAndEpoch); + +OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> { +return new IllegalStateException( +String.format( +"Expected to find an end offset for epoch %s since it must be less than the current epoch %s", +lastFetchedEpoch, +quorum.epoch() +) +); +}); + +if (log.oldestSnapshotId().isPresent() && +((fetchOffset < log.startOffset()) || + (fetchOffset == log.startOffset() && lastFetchedEpoch != log.oldestSnapshotId().get().epoch) || +
[GitHub] [kafka] wangwalton opened a new pull request #9993: Schema Exception Interface fix
wangwalton opened a new pull request #9993: URL: https://github.com/apache/kafka/pull/9993 Bug fix. I'm passing in a SchemaBuilder for schema, and this will always throw an exception since it's doing an equals comparison. The reason that I need to pass in a builder is because my schema contains possibilities for a graph, and the only way to represent this is with SchemaBuilder since ConnectSchema is not mutable. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566406761 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -99,7 +99,7 @@ public final InetSocketAddress address; public InetAddressSpec(InetSocketAddress address) { -if (address.equals(NON_ROUTABLE_ADDRESS)) { +if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) { Review comment: Right. Makes sense. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
hachikuji commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566405275 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -99,7 +99,7 @@ public final InetSocketAddress address; public InetAddressSpec(InetSocketAddress address) { -if (address.equals(NON_ROUTABLE_ADDRESS)) { +if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) { Review comment: Currently our equals method assumes non-null addresses. I can't think of a case where we would want it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
lct45 commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566398284 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { +if (!hasPersistentStores) { +return UUID.randomUUID(); +} + +if (!lockStateDirectory()) { +log.error("Unable to obtain lock as state directory is already locked by another process"); +throw new StreamsException("Unable to initialize state, this can happen if multiple instances of " + + "Kafka Streams are running in the same state directory"); +} + +final File processFile = new File(stateDir, PROCESS_FILE_NAME); +try { +if (processFile.exists()) { +try (final BufferedReader reader = Files.newBufferedReader(processFile.toPath())) { +// only field in version 0 is the UUID +final int version = Integer.parseInt(reader.readLine()); +if (version > 0) { Review comment: Do we want to make this set to PROCESS_FILE_VERSION on the off chance anyone needs to increment it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9872: KAFKA-10759: ARM support for Kafka
mumrah commented on pull request #9872: URL: https://github.com/apache/kafka/pull/9872#issuecomment-769382560 Ok, the ARM build compiles at least https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9992/1/console I'll try running the unit test suite next. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566394059 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -99,7 +99,7 @@ public final InetSocketAddress address; public InetAddressSpec(InetSocketAddress address) { -if (address.equals(NON_ROUTABLE_ADDRESS)) { +if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) { Review comment: Yea, I thought of that. But would there ever be a case where we'd need to construct the address spec with a null? Probably not. Might as well use a nullptr for the address spec at that point I guess. Fair enough, will change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
hachikuji commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566390451 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -99,7 +99,7 @@ public final InetSocketAddress address; public InetAddressSpec(InetSocketAddress address) { -if (address.equals(NON_ROUTABLE_ADDRESS)) { +if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) { Review comment: We also don't want to accept null addresses, right? I was thinking we could do this: ```java if (address == null || address.equals(NON_ROUTABLE_ADDRESS)) { throw new IllegalArgumentException("Invalid address " + address); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r564906839 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender +import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde +import org.junit.Assert.assertFalse +import org.junit.Test + +class SerdesUnitTest { + + @Test + def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = { + +Serdes.timeWindowedSerde(new TimeWindowedSerde[String]()) +val appender = LogCaptureAppender.createAndRegister() +val warning = appender.getMessages() +assertFalse("There should be a warning about TimeWindowedDeserializer", warning.isEmpty) Review comment: This test also appears to fail locally since it uses the constructor we deprecate in this PR and from a quick search it doesn't look like scala suppresses these warnings as easily as Java. Any thoughts on if this test is vital / a way to make this work? cc @guozhangwang If I suppress warnings, the test fails (: thoughts on viability? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566379864 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); Review comment: Nope, we just hold it until the KafkaStreams is closed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566379088 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -89,8 +92,23 @@ private final int appendLingerMs; private final Map voterConnections; -public static abstract class AddressSpec { - public abstract InetSocketAddress address(); +public interface AddressSpec { +} + +public static class InetAddressSpec implements AddressSpec { +public final InetSocketAddress address; + +public InetAddressSpec(InetSocketAddress address) { +if (address.equals(NON_ROUTABLE_ADDRESS)) { Review comment: Good catch! Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566378930 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -102,31 +120,23 @@ public boolean equals(Object obj) { return false; } -final AddressSpec that = (AddressSpec) obj; -return that.address().equals(address()); +final InetAddressSpec that = (InetAddressSpec) obj; +return that.address.equals(address); } } -public static class InetAddressSpec extends AddressSpec { -private final InetSocketAddress address; - -public InetAddressSpec(InetSocketAddress address) { -if (address.equals(UNROUTABLE_ADDRESS)) { -throw new IllegalArgumentException("Address not routable"); -} -this.address = address; +public static class UnknownAddressSpec implements AddressSpec { +private UnknownAddressSpec() { Review comment: Yea, that's true actually. Should work fine for the map equals test case containing an instance of the UnknownAddressSpec. Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566378383 ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -1001,6 +1001,12 @@ class KafkaConfigTest { assertEquals(expectedVoters, raftConfig.quorumVoterConnections()) } + @Test + def testNonRoutableAddressSpecException(): Unit = { +assertThrows(classOf[IllegalArgumentException], + () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0))) Review comment: Yea, the only thing this is testing is that the `InetAddressSpec` never accepts a `0.0.0.0:0` as a parameter. We could remove it instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566377621 ## File path: core/src/main/scala/kafka/raft/RaftManager.scala ## @@ -126,9 +126,13 @@ class KafkaRaftManager[T]( case spec: InetAddressSpec => { netChannel.updateEndpoint(voterAddressEntry.getKey, spec) } +case _: UnknownAddressSpec => { Review comment: Ah makes sense. Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
hachikuji commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r566373256 ## File path: core/src/main/scala/kafka/raft/RaftManager.scala ## @@ -126,9 +126,13 @@ class KafkaRaftManager[T]( case spec: InetAddressSpec => { netChannel.updateEndpoint(voterAddressEntry.getKey, spec) } +case _: UnknownAddressSpec => { Review comment: nit: the braces in these `case` matches are not needed ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -102,31 +120,23 @@ public boolean equals(Object obj) { return false; } -final AddressSpec that = (AddressSpec) obj; -return that.address().equals(address()); +final InetAddressSpec that = (InetAddressSpec) obj; +return that.address.equals(address); } } -public static class InetAddressSpec extends AddressSpec { -private final InetSocketAddress address; - -public InetAddressSpec(InetSocketAddress address) { -if (address.equals(UNROUTABLE_ADDRESS)) { -throw new IllegalArgumentException("Address not routable"); -} -this.address = address; +public static class UnknownAddressSpec implements AddressSpec { +private UnknownAddressSpec() { Review comment: Since we have a private constructor, I think we will only have the `UNKNOWN_ADDRESS_SPEC_INSTANCE` instance. So can we rely on the default equals/hashcode? ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -89,8 +92,23 @@ private final int appendLingerMs; private final Map voterConnections; -public static abstract class AddressSpec { - public abstract InetSocketAddress address(); +public interface AddressSpec { +} + +public static class InetAddressSpec implements AddressSpec { +public final InetSocketAddress address; + +public InetAddressSpec(InetSocketAddress address) { +if (address.equals(NON_ROUTABLE_ADDRESS)) { Review comment: Maybe worth adding a null check here? ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -1001,6 +1001,12 @@ class KafkaConfigTest { assertEquals(expectedVoters, raftConfig.quorumVoterConnections()) } + @Test + def testNonRoutableAddressSpecException(): Unit = { +assertThrows(classOf[IllegalArgumentException], + () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0))) Review comment: nit: I still think `RaftConfigTest` is a better home for this. It doesn't involve `KafkaConfig` at all. We could also just delete the test since it probably isn't buying us much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-8940: -- Description: The test does not properly account for windowing. See this comment for full details. We can patch this test by fixing the timestamps of the input data to avoid crossing over a window boundary, or account for this when verifying the output. Since we have access to the input data it should be possible to compute whether/when we do cross a window boundary, and adjust the expected output accordingly was:The test does not properly account for windowing. See [this comment|https://issues.apache.org/jira/browse/KAFKA-8940?focusedCommentId=17214850&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17214850] for full details > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test, newbie++ > > The test does not properly account for windowing. See this comment for full > details. > We can patch this test by fixing the timestamps of the input data to avoid > crossing over a window boundary, or account for this when verifying the > output. Since we have access to the input data it should be possible to > compute whether/when we do cross a window boundary, and adjust the expected > output accordingly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-8940: -- Description: The test does not properly account for windowing. See [this comment|https://issues.apache.org/jira/browse/KAFKA-8940?focusedCommentId=17214850&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17214850] for full details (was: The test does not properly account for windowing. See) > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test, newbie++ > > The test does not properly account for windowing. See [this > comment|https://issues.apache.org/jira/browse/KAFKA-8940?focusedCommentId=17214850&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17214850] > for full details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-8940: -- Description: The test does not properly account for windowing. See (was: I lost the screen shot unfortunately... it reports the set of expected records does not match the received records.) > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test, newbie++ > > The test does not properly account for windowing. See -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-8940: -- Labels: flaky-test newbie++ (was: flaky-test) > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test, newbie++ > > I lost the screen shot unfortunately... it reports the set of expected > records does not match the received records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah opened a new pull request #9992: KAFKA-10759 Add ARM build stage
mumrah opened a new pull request #9992: URL: https://github.com/apache/kafka/pull/9992 Copy of https://github.com/apache/kafka/pull/9872, but opened by a committer so Jenkins will actually apply the Jenkinsfile changes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9872: KAFKA-10759: ARM support for Kafka
mumrah commented on pull request #9872: URL: https://github.com/apache/kafka/pull/9872#issuecomment-769342004 I believe we have Jenkins configured so it will only take Jenkinsfile changes in a PR if they are from a committer. I'll open a PR shortly to see if this works. @ijuma looks like a few different ARM images are provided by Apache Infra: https://cwiki.apache.org/confluence/display/INFRA/ci-builds.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org