[GitHub] [kafka] huxihx commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
huxihx commented on pull request #9051: URL: https://github.com/apache/kafka/pull/9051#issuecomment-662260819 @rajinisivaram Please review this patch. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162450#comment-17162450 ] Jerry Wei commented on KAFKA-10134: --- [~guozhang] refer to the attached file (consumer5.log.2020-07-22.log), the last re-join happened at " 10:52:41.247 [pool-1-thread-1] INFO o.a.k.c.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer5, groupId=consumerGroupId] (Re-)joining group" because there is one consumer trying to join. interesting, this time looks better than before, but still spend more than one min. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jerry Wei updated KAFKA-10134: -- Attachment: (was: consumer4.log.2020-07-22.log) > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jerry Wei updated KAFKA-10134: -- Attachment: consumer5.log.2020-07-22.log > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jerry Wei updated KAFKA-10134: -- Attachment: consumer4.log.2020-07-22.log > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.
[ https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuo Zhang resolved KAFKA-9343. --- > Add ps command for Kafka and zookeeper process on z/OS. > --- > > Key: KAFKA-9343 > URL: https://issues.apache.org/jira/browse/KAFKA-9343 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 2.4.0 > Environment: z/OS, OS/390 >Reporter: Shuo Zhang >Priority: Major > Labels: OS/390, z/OS > Fix For: 2.4.2, 2.5.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > +Note: since the final change scope changed, I changed the summary and > description.+ > The existing method to check Kafka process for other platform doesn't > applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. > PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk > '\{print $1}') > --> > PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v > grep | awk '\{print $1}') > So does the zookeeper process. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10269) AdminClient ListOffsetsResultInfo/timestamp is always -1
[ https://issues.apache.org/jira/browse/KAFKA-10269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162444#comment-17162444 ] huxihx commented on KAFKA-10269: [~d-t-w] Thanks for reporting and feel free to take this ticket. > AdminClient ListOffsetsResultInfo/timestamp is always -1 > > > Key: KAFKA-10269 > URL: https://issues.apache.org/jira/browse/KAFKA-10269 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.5.0 >Reporter: Derek Troy-West >Priority: Minor > > When using AdminClient/listOffsets the resulting ListOffsetResultInfos appear > to always have a timestamp of -1. > I've run listOffsets against live clusters with multiple Kafka versions (from > 1.0 to 2.5) with both CreateTIme and LogAppendTime for > message.timestamp.type, every result has -1 timestamp. > e.g. > {{org.apache.kafka.clients.admin.ListOffsetsResult$ListOffsetsResultInfo#}}{{0x5c3a771}} > ListOffsetsResultInfo(} offset=23016, timestamp=-1, > {{leaderEpoch=Optional[0])}} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] huxihx opened a new pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
huxihx opened a new pull request #9051: URL: https://github.com/apache/kafka/pull/9051 https://issues.apache.org/jira/browse/KAFKA-10268 Currently, ConfigCommand's `--delete-config` API does not restore the config to default value, no matter at broker-level or broker-default level. Besides, Admin.incrementalAlterConfigs API also runs into this problem. This patch fixes it by removing the corresponding config from the `newConfig` properties when reconfiguring dynamic broker config. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jeffkbkim opened a new pull request #9050: URL: https://github.com/apache/kafka/pull/9050 JIRA: https://issues.apache.org/jira/browse/KAFKA-10193 * add `preempt(): Unit` method for all `ControllerEvent` so that all events (and future events) must implement it * for events that have callbacks, move the preemption from individual methods to `preempt()` * add preemption for `ApiPartitionReassignment`and `ListPartitionReassignments` * add integration tests: 1. test whether `preempt()` is called when controller shuts down 2. test whether the events with callbacks have the correct error response (`NOT_CONTROLLER`) * explicit typing for `ControllerEvent` methods ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-662192375 @vvcephei Updated this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458483259 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException { } } -@SuppressWarnings("deprecation") // TODO revisit in follow up PR @Test -public void shouldRetryWhenEndOffsetsThrowsTimeoutException() { -final int retries = 2; +public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override -public synchronized Map endOffsets(final Collection partitions) { +public synchronized Map endOffsets(final Collection partitions) { numberOfCalls.incrementAndGet(); -throw new TimeoutException(); +throw new TimeoutException("KABOOM!"); } }; +initializeConsumer(0, 0, t1, t2, t3, t4); + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); -put(StreamsConfig.RETRIES_CONFIG, retries); +put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L); } }); -try { -new GlobalStateManagerImpl( -new LogContext("mock"), -topology, -consumer, -stateDirectory, -stateRestoreListener, -streamsConfig); -} catch (final StreamsException expected) { -assertEquals(numberOfCalls.get(), retries); -} +stateManager = new GlobalStateManagerImpl( +new LogContext("mock"), +time, +topology, +consumer, +stateDirectory, +stateRestoreListener, +streamsConfig +); +processorContext.setStateManger(stateManager); +stateManager.setGlobalProcessorContext(processorContext); + +final StreamsException expected = assertThrows( +StreamsException.class, +() -> stateManager.initialize() +); +final Throwable cause = expected.getCause(); +assertThat(cause, instanceOf(TimeoutException.class)); +assertThat(cause.getMessage(), equalTo("KABOOM!")); + +assertEquals(numberOfCalls.get(), 1); } -@SuppressWarnings("deprecation") // TODO revisit in follow up PR @Test -public void shouldRetryWhenPartitionsForThrowsTimeoutException() { -final int retries = 2; +public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override -public synchronized List partitionsFor(final String topic) { +public synchronized Map endOffsets(final Collection partitions) { +time.sleep(100L); numberOfCalls.incrementAndGet(); -throw new TimeoutException(); +throw new TimeoutException("KABOOM!"); } }; +initializeConsumer(0, 0, t1, t2, t3, t4); + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); -put(StreamsConfig.RETRIES_CONFIG, retries); +put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L); } }); -try { -new GlobalStateManagerImpl( -new LogContext("mock"), -topology, -consumer, -stateDirectory, -stateRestoreListener, -streamsConfig); -} catch (final StreamsException expected) { -assertEquals(numberOfCalls.get(), retries); -} +stateManager = new GlobalStateManagerImpl( +new LogContext("mock"), +time, +topology, +consumer, +stateDirectory, +stateRestoreListener, +streamsConfig +); +processorContext.setStateManger(stateManager); +stateManager.setGlobalProcessorContext(processorContext); + +final TimeoutException expected = assertThrows( +TimeoutException.class, +() -> stateManager.in
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458483023 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException { } } -@SuppressWarnings("deprecation") // TODO revisit in follow up PR @Test -public void shouldRetryWhenEndOffsetsThrowsTimeoutException() { -final int retries = 2; +public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override -public synchronized Map endOffsets(final Collection partitions) { +public synchronized Map endOffsets(final Collection partitions) { numberOfCalls.incrementAndGet(); -throw new TimeoutException(); +throw new TimeoutException("KABOOM!"); } }; +initializeConsumer(0, 0, t1, t2, t3, t4); + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); -put(StreamsConfig.RETRIES_CONFIG, retries); +put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L); } }); -try { -new GlobalStateManagerImpl( -new LogContext("mock"), -topology, -consumer, -stateDirectory, -stateRestoreListener, -streamsConfig); -} catch (final StreamsException expected) { -assertEquals(numberOfCalls.get(), retries); -} +stateManager = new GlobalStateManagerImpl( +new LogContext("mock"), +time, +topology, +consumer, +stateDirectory, +stateRestoreListener, +streamsConfig +); +processorContext.setStateManger(stateManager); +stateManager.setGlobalProcessorContext(processorContext); + +final StreamsException expected = assertThrows( +StreamsException.class, +() -> stateManager.initialize() +); +final Throwable cause = expected.getCause(); +assertThat(cause, instanceOf(TimeoutException.class)); +assertThat(cause.getMessage(), equalTo("KABOOM!")); + +assertEquals(numberOfCalls.get(), 1); } -@SuppressWarnings("deprecation") // TODO revisit in follow up PR @Test -public void shouldRetryWhenPartitionsForThrowsTimeoutException() { -final int retries = 2; +public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override -public synchronized List partitionsFor(final String topic) { +public synchronized Map endOffsets(final Collection partitions) { +time.sleep(100L); numberOfCalls.incrementAndGet(); -throw new TimeoutException(); +throw new TimeoutException("KABOOM!"); } }; +initializeConsumer(0, 0, t1, t2, t3, t4); + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); -put(StreamsConfig.RETRIES_CONFIG, retries); +put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L); } }); -try { -new GlobalStateManagerImpl( -new LogContext("mock"), -topology, -consumer, -stateDirectory, -stateRestoreListener, -streamsConfig); -} catch (final StreamsException expected) { -assertEquals(numberOfCalls.get(), retries); -} +stateManager = new GlobalStateManagerImpl( +new LogContext("mock"), +time, +topology, +consumer, +stateDirectory, +stateRestoreListener, +streamsConfig +); +processorContext.setStateManger(stateManager); +stateManager.setGlobalProcessorContext(processorContext); + +final TimeoutException expected = assertThrows( +TimeoutException.class, +() -> stateManager.in
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458482846 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException { } } -@SuppressWarnings("deprecation") // TODO revisit in follow up PR @Test -public void shouldRetryWhenEndOffsetsThrowsTimeoutException() { -final int retries = 2; +public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override -public synchronized Map endOffsets(final Collection partitions) { +public synchronized Map endOffsets(final Collection partitions) { numberOfCalls.incrementAndGet(); -throw new TimeoutException(); +throw new TimeoutException("KABOOM!"); } }; +initializeConsumer(0, 0, t1, t2, t3, t4); + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); -put(StreamsConfig.RETRIES_CONFIG, retries); +put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L); } }); -try { -new GlobalStateManagerImpl( -new LogContext("mock"), -topology, -consumer, -stateDirectory, -stateRestoreListener, -streamsConfig); -} catch (final StreamsException expected) { -assertEquals(numberOfCalls.get(), retries); -} +stateManager = new GlobalStateManagerImpl( +new LogContext("mock"), +time, +topology, +consumer, +stateDirectory, +stateRestoreListener, +streamsConfig +); +processorContext.setStateManger(stateManager); +stateManager.setGlobalProcessorContext(processorContext); + +final StreamsException expected = assertThrows( +StreamsException.class, +() -> stateManager.initialize() +); +final Throwable cause = expected.getCause(); +assertThat(cause, instanceOf(TimeoutException.class)); +assertThat(cause.getMessage(), equalTo("KABOOM!")); + +assertEquals(numberOfCalls.get(), 1); } -@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Third TODO This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458482803 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set changelogTopics = new HashSet<>(); -for (final StateStore stateStore : globalStateStores) { + +long deadlineMs = NO_DEADLINE; +final List storesToInitialize = new LinkedList<>(globalStateStores); + +while (!storesToInitialize.isEmpty()) { +// we remove and add back on failure to round-robin through all stores +final StateStore stateStore = storesToInitialize.remove(0); globalStoreNames.add(stateStore.name()); final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); -stateStore.init(globalProcessorContext, stateStore); + +try { +stateStore.init(globalProcessorContext, stateStore); +deadlineMs = NO_DEADLINE; +} catch (final RetryableErrorException retryableException) { +if (taskTimeoutMs == 0L) { +throw new StreamsException(retryableException.getCause()); Review comment: If we rethrow, we get rid of the `RetryableErrorException` and pass in the original root cause, ie, the `TimeoutException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458482408 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException { } } -@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Second TODO This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458482045 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -58,30 +60,33 @@ * of Global State Stores. There is only ever 1 instance of this class per Application Instance. */ public class GlobalStateManagerImpl implements GlobalStateManager { +private final static long NO_DEADLINE = -1L; + private final Logger log; +private final Time time; private final Consumer globalConsumer; private final File baseDir; private final StateDirectory stateDirectory; private final Set globalStoreNames = new HashSet<>(); private final FixedOrderMap> globalStores = new FixedOrderMap<>(); private final StateRestoreListener stateRestoreListener; private InternalProcessorContext globalProcessorContext; -private final int retries; -private final long retryBackoffMs; private final Duration pollTime; +private final long taskTimeoutMs; private final Set globalNonPersistentStoresTopics = new HashSet<>(); private final OffsetCheckpoint checkpointFile; private final Map checkpointFileCache; private final Map storeToChangelogTopic; private final List globalStateStores; -@SuppressWarnings("deprecation") // TODO: remove in follow up PR when `RETRIES` is removed Review comment: This PR fixed 3 TODOs form the first PR. (The other two are in the test -- also added a comment to the original PR that links to this PR as reference.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458481794 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/RetryableErrorException.java ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class RetryableErrorException extends StreamsException { Review comment: Added new exception type as requested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458481626 ## File path: docs/upgrade.html ## @@ -23,8 +23,8 @@ Notable changes in 2 The configuration parameter retries is deprecated for the producer, admin, and Kafka Streams clients via https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams";>KIP-572. -You should use the producer's delivery.timeout.ms, admin's default.api.timeout.ms, and -Kafka Streams' new task.timeout.ms parameters instead. +You should use the producer's delivery.timeout.ms and max.block.ms, admin's Review comment: As above. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -357,7 +357,7 @@ /** {@code commit.interval.ms} */ @SuppressWarnings("WeakerAccess") public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; -private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor." + +private static final String COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds with which to save the position of the processor." + Review comment: Same fixes as in the docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458481521 ## File path: docs/streams/upgrade-guide.html ## @@ -95,11 +95,12 @@ Streams API -The configuration parameter retries is deprecated in favor of a the new parameter task.timeout.ms. Review comment: Fixed some typos. And added reference to `max.block.ms` config This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458481381 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. -60 milliseconds +60 milliseconds (10 minutes) state.dir High Directory location for state stores. /tmp/kafka-streams + task.timeout.ms Review comment: Forgot to add the new config in the first PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458481315 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -308,15 +308,10 @@ bootstrap.serversThe replication factor for changelog topics and repartition topics created by the application. 1 - retries Review comment: Forgot to remove `retries` in the first PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458481231 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -203,7 +203,7 @@ bootstrap.serverscommit.interval.ms Low -The frequency with which to save the position (offsets in source topics) of tasks. +The frequency in milliseconds with which to save the position (offsets in source topics) of tasks. Review comment: Added a couple of side fixed for the docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r458282421 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -454,6 +456,41 @@ public void flush() { } } +public void flushCache() { +RuntimeException firstException = null; +// attempting to flush the stores +if (!stores.isEmpty()) { +log.debug("Flushing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +try { +// buffer should be flushed to send all records to changelog +if (store instanceof TimeOrderedKeyValueBuffer) { +store.flush(); +} else if (store instanceof CachedStateStore) { +((CachedStateStore) store).flushCache(); +} +log.trace("Flushed cache or buffer {}", store.name()); +} catch (final RuntimeException exception) { +if (firstException == null) { +// do NOT wrap the error if it is actually caused by Streams itself +if (exception instanceof StreamsException) +firstException = exception; +else +firstException = new ProcessorStateException( Review comment: nit: can you use braces here? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -520,20 +557,27 @@ void transitionTaskType(final TaskType newType, final LogContext logContext) { log.debug("Transitioning state manager for {} task {} to {}", oldType, taskId, newType); } -@Override -public void checkpoint(final Map writtenOffsets) { -// first update each state store's current offset, then checkpoint -// those stores that are only logged and persistent to the checkpoint file +void updateChangelogOffsets(final Map writtenOffsets) { for (final Map.Entry entry : writtenOffsets.entrySet()) { final StateStoreMetadata store = findStore(entry.getKey()); if (store != null) { store.setOffset(entry.getValue()); log.debug("State store {} updated to written offset {} at changelog {}", -store.stateStore.name(), store.offset, store.changelogPartition); +store.stateStore.name(), store.offset, store.changelogPartition); } } +} + +@Override +public void checkpoint(final Map writtenOffsets) { +// first update each state store's current offset, then checkpoint +// those stores that are only logged and persistent to the checkpoint file +// TODO: we still need to keep it as part of the checkpoint for global tasks; this could be +// removed though when we consolidate global tasks / state managers into this one Review comment: >TODO: we still need to keep it as part of the checkpoint for global tasks Took me a while to realize that "it" refers to the argument here -- can you clarify the comment? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -466,15 +458,8 @@ public void postCommit() { */ partitionGroup.clear(); Review comment: This is also maybe beyond the scope of this PR, but it seems like there's no reason to do things like this anymore. Specifically, today we enforce the order `suspend` -> `pre/postCommit` -> `close` where `suspend` only closes the topology and we only use the `SUSPENDED` state to enforce that we suspended before closing. Why not swap the order so that we `pre/postCommit` -> `suspend` -> `close` and then we can move this call from `postCommit` to `suspend` where it makes more sense. Thoughts? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -49,6 +61,31 @@ this.stateDirectory = stateDirectory; } +protected void initializeCheckpoint() { +// we will delete the local checkpoint file after registering the state stores and loading them into the +// state manager, therefore we should initialize the snapshot as empty to indicate over-write checkpoint needed +offsetSnapshotSinceLastFlush = Collections.emptyMap(); +} + +/** + * The following exceptions maybe thrown from the state manager flushing call + * + * @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed +
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r458382935 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.network.ByteBufferSend; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer + * of data from a record-set's file channel to the eventual socket channel. + * + * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array + * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written + * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes, + * another Send is passed to the consumer which wraps the underlying record-set's transfer logic. + * + * For example, + * + * + * recordsWritable.writeInt(10); + * recordsWritable.writeRecords(records1); + * recordsWritable.writeInt(20); + * recordsWritable.writeRecords(records2); + * recordsWritable.writeInt(30); + * recordsWritable.flush(); + * + * + * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any + * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is + * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}. + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsWriter implements Writable { +private final String dest; +private final Consumer sendConsumer; +private final ByteArrayOutputStream byteArrayOutputStream; +private final DataOutput output; + +public RecordsWriter(String dest, Consumer sendConsumer) { +this.dest = dest; +this.sendConsumer = sendConsumer; +this.byteArrayOutputStream = new ByteArrayOutputStream(); +this.output = new DataOutputStream(this.byteArrayOutputStream); +} + +@Override +public void writeByte(byte val) { +writeQuietly(() -> output.writeByte(val)); +} + +@Override +public void writeShort(short val) { +writeQuietly(() -> output.writeShort(val)); +} + +@Override +public void writeInt(int val) { +writeQuietly(() -> output.writeInt(val)); +} + +@Override +public void writeLong(long val) { +writeQuietly(() -> output.writeLong(val)); + +} + +@Override +public void writeDouble(double val) { +writeQuietly(() -> ByteUtils.writeDouble(val, output)); + +} + +@Override +public void writeByteArray(byte[] arr) { +writeQuietly(() -> output.write(arr)); +} + +@Override +public void writeUnsignedVarint(int i) { +writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output)); +} + +@Override +public void writeByteBuffer(ByteBuffer src) { +writeQuietly(() -> output.write(src.array(), src.position(), src.remaining())); +} + +@FunctionalInterface +private interface IOExceptionThrowingRunnable { +void run() throws IOException; +} + +private void writeQuietly(IOExceptionThrowingRunnable runnable) { +try { +runnable.run(); +} catch (IOException e) { +throw new RuntimeException("Writable encountered an IO error", e); +} +} + +@Override +public void writeRecords(BaseRecords records) { +flush(); +sendConsumer.accept(records.toSend(dest)); +} + +/** + * Flush any pending bytes as a ByteBufferSend and reset the buffer + *
[GitHub] [kafka] abbccdda commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda commented on a change in pull request #9012: URL: https://github.com/apache/kafka/pull/9012#discussion_r458356891 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit} + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.network.RequestChannel +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.Node +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder +import org.apache.kafka.common.security.JaasContext + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * This class manages the connection between a broker and the controller. It runs a single + * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find + * and connect to the controller. The channel is async and runs the network connection in the background. + * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore + * care must be taken to not block on outstanding requests for too long. + */ +class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache, + time: Time, + metrics: Metrics, + config: KafkaConfig, + threadNamePrefix: Option[String] = None) extends Logging { + private val requestQueue = new LinkedBlockingQueue[BrokerToControllerQueueItem] + private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") + private val manualMetadataUpdater = new ManualMetadataUpdater() + private val requestThread = newRequestThread + + def start(): Unit = { +requestThread.start() + } + + def shutdown(): Unit = { +requestThread.shutdown() +requestThread.awaitShutdown() + } + + private[server] def newRequestThread = { +val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + +val networkClient = { + val channelBuilder = ChannelBuilders.clientChannelBuilder( +brokerToControllerSecurityProtocol, +JaasContext.Type.SERVER, +config, +brokerToControllerListenerName, +config.saslMechanismInterBrokerProtocol, +time, +config.saslInterBrokerHandshakeRequestEnable, +logContext + ) + val selector = new Selector( +NetworkReceive.UNLIMITED, +Selector.NO_IDLE_TIMEOUT_MS, +metrics, +time, +"BrokerToControllerChannel", +Map("BrokerId" -> config.brokerId.toString).asJava, +false, +channelBuilder, +logContext + ) + new NetworkClient( +selector, +manualMetadataUpdater, +config.brokerId.toString, +1, +0, +0, +Selectable.USE_DEFAULT_BUFFER_SIZE, +Selectable.USE_DEFAULT_BUFFER_SIZE, +config.requestTimeoutMs, +config.connectionSetupTimeoutMs, +config.connectionSetupTimeoutMaxMs, +ClientDnsLookup.DEFAULT, +time, +false, +new ApiVersions, +logContext + ) +} +val threadName = threadNamePrefix match { + case None => s"broker-${config.brokerId}-to-controller-send-thread" + case Some(name) => s"$name:broker-${config.brokerId}-to-controller-send-thread" +} + +new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, requestQueue, metadataCache, config, + brokerToControllerListene
[GitHub] [kafka] abbccdda commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda commented on a change in pull request #9012: URL: https://github.com/apache/kafka/pull/9012#discussion_r458356772 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit} + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.network.RequestChannel +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.Node +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder +import org.apache.kafka.common.security.JaasContext + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * This class manages the connection between a broker and the controller. It runs a single + * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find + * and connect to the controller. The channel is async and runs the network connection in the background. + * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore + * care must be taken to not block on outstanding requests for too long. + */ +class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache, + time: Time, + metrics: Metrics, + config: KafkaConfig, + threadNamePrefix: Option[String] = None) extends Logging { + private val requestQueue = new LinkedBlockingQueue[BrokerToControllerQueueItem] + private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") + private val manualMetadataUpdater = new ManualMetadataUpdater() + private val requestThread = newRequestThread + + def start(): Unit = { +requestThread.start() + } + + def shutdown(): Unit = { +requestThread.shutdown() +requestThread.awaitShutdown() + } + + private[server] def newRequestThread = { +val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + +val networkClient = { + val channelBuilder = ChannelBuilders.clientChannelBuilder( +brokerToControllerSecurityProtocol, +JaasContext.Type.SERVER, +config, +brokerToControllerListenerName, +config.saslMechanismInterBrokerProtocol, +time, +config.saslInterBrokerHandshakeRequestEnable, +logContext + ) + val selector = new Selector( +NetworkReceive.UNLIMITED, +Selector.NO_IDLE_TIMEOUT_MS, +metrics, +time, +"BrokerToControllerChannel", +Map("BrokerId" -> config.brokerId.toString).asJava, +false, +channelBuilder, +logContext + ) + new NetworkClient( +selector, +manualMetadataUpdater, +config.brokerId.toString, +1, +0, +0, +Selectable.USE_DEFAULT_BUFFER_SIZE, +Selectable.USE_DEFAULT_BUFFER_SIZE, +config.requestTimeoutMs, +config.connectionSetupTimeoutMs, +config.connectionSetupTimeoutMaxMs, +ClientDnsLookup.DEFAULT, +time, +false, +new ApiVersions, +logContext + ) +} +val threadName = threadNamePrefix match { + case None => s"broker-${config.brokerId}-to-controller-send-thread" + case Some(name) => s"$name:broker-${config.brokerId}-to-controller-send-thread" +} + +new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, requestQueue, metadataCache, config, + brokerToControllerListene
[GitHub] [kafka] mjsax commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
mjsax commented on a change in pull request #8864: URL: https://github.com/apache/kafka/pull/8864#discussion_r458346124 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -612,6 +612,7 @@ public boolean lockGlobalState() throws IOException { } } +@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Addressed via #9047 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -645,6 +646,7 @@ public void shouldRetryWhenEndOffsetsThrowsTimeoutException() { } } +@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Addressed via #9047 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
mjsax commented on a change in pull request #8864: URL: https://github.com/apache/kafka/pull/8864#discussion_r458345947 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -74,6 +74,7 @@ private final OffsetCheckpoint checkpointFile; private final Map checkpointFileCache; +@SuppressWarnings("deprecation") // TODO: remove in follow up PR when `RETRIES` is removed Review comment: Addressed via #9047 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10298) Replace Windows with a proper interface
John Roesler created KAFKA-10298: Summary: Replace Windows with a proper interface Key: KAFKA-10298 URL: https://issues.apache.org/jira/browse/KAFKA-10298 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler See POC: [https://github.com/apache/kafka/pull/9031] Presently, windowed aggregations in KafkaStreams fall into two categories: * Windows ** TimeWindows ** UnlimitedWindows ** JoinWindows * SessionWindows Unfortunately, Windows is an abstract class instead of an interface, and it forces some fields onto its implementations. This has led to a number of problems over the years, but so far we have been able to live with them. However, as we consider adding new implementations to this side of the hierarchy, the damage will spread. See KIP-450, for example. We should take the opportunity now to correct the issue by introducing an interface and deprecating Windows itself. Then, we can implement new features cleanly and maybe remove Windows in the 3.0 release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-7106) Remove segment/segmentInterval from Window definition
[ https://issues.apache.org/jira/browse/KAFKA-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-7106: Comment: was deleted (was: guozhangwang commented on pull request #7670: KAFKA-7106: Not hide the stack trace for ApiException URL: https://github.com/apache/kafka/pull/7670 Unit tests passed locally, do not think it has any compatibility breakage. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Remove segment/segmentInterval from Window definition > - > > Key: KAFKA-7106 > URL: https://issues.apache.org/jira/browse/KAFKA-7106 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > Currently, Window configures segment and segmentInterval properties, but > these aren't truly properties of a window in general. > Rather, they are properties of the particular implementation that we > currently have: a segmented store. Therefore, these properties should be > moved to configure only that implementation. > > This may be related to KAFKA-4730, since an in-memory window store wouldn't > necessarily need to be segmented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
mjsax merged pull request #8864: URL: https://github.com/apache/kafka/pull/8864 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
mjsax commented on pull request #8864: URL: https://github.com/apache/kafka/pull/8864#issuecomment-662055495 Thanks @vvcephei -- I don't think we need tickets. I will use this PR as reference itself to make sure we address all TODO that are added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jamiealquiza commented on pull request #1596: KAFKA-1543: Change replication factor during partition map generation
jamiealquiza commented on pull request #1596: URL: https://github.com/apache/kafka/pull/1596#issuecomment-662051029 Found this issue while browsing for something else, but fwiw my [topicmappr](https://github.com/DataDog/kafka-kit/tree/master/cmd/topicmappr) cli tool does exactly this: `topicmappr rebuild --topics --brokers -1 --replication n`. Creates a compatible file that can now be assigned or otherwise worked with. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9049: MINOR: fix scala warnings
abbccdda opened a new pull request #9049: URL: https://github.com/apache/kafka/pull/9049 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10271) Performance regression while fetching a key from a single partition
[ https://issues.apache.org/jira/browse/KAFKA-10271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dima R updated KAFKA-10271: --- Summary: Performance regression while fetching a key from a single partition (was: Performance degradation while fetching a key from a single partition) > Performance regression while fetching a key from a single partition > --- > > Key: KAFKA-10271 > URL: https://issues.apache.org/jira/browse/KAFKA-10271 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0, 2.6.0, 2.5.1 >Reporter: Dima R >Assignee: Dima R >Priority: Major > Labels: KAFKA-10030, KAFKA-9445, KIP-562 > Fix For: 2.5.2, 2.6.1 > > Attachments: 9020.png > > > This is follow-up bug for KAFKA-10030 > StreamThreadStateStoreProvider excessive loop over calling > internalTopologyBuilder.topicGroups(), which is synchronized, thus causing > significant performance degradation to the caller, especially when store has > many partitions. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine opened a new pull request #9048: WIP: Replace org.reflections with classgraph for class scanning in Connect
kkonstantine opened a new pull request #9048: URL: https://github.com/apache/kafka/pull/9048 https://github.com/classgraph/classgraph is a more recent and more actively developed classpath scanning library. I'll be doing some testing and performance benchmarking before I open this PR for general review. Opening as draft atm. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10297) Don't use deprecated producer config `retries`
[ https://issues.apache.org/jira/browse/KAFKA-10297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10297: Description: In 2.7.0 release, producer config `retries` gets deprecated via KIP-572. Connect is still using this config what needs to be fixed (cf [https://github.com/apache/kafka/pull/8864/files#r439685920]) {quote}Btw: @hachikuji raise a concern about this issue, too: https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531 > I just had one question about the proposal. Using retries=0 in the producer > allows the user to have "at-most-once" delivery. This allows the application > to implement its own retry logic for example. Do we still have a way to do > this once this configuration is gone? So maybe we need to do some follow up work in the `Producer` to make it work for Connect. But I would defer this to the follow up work. My original though was, that setting `max.deliver.timeout.ms := request .timeout.ms` might prevent internal retries. But we need to verify this. It was also brought to my attention, that this might not work if the network disconnects -- only `retries=0` would prevent to re-open the connection but a low timeout would not prevent retries. In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms = 0` as "no retries" -- maybe we can do a similar thing for the producer? There is also `max.block.ms` that we should consider. Unfortunately, I am not an expert on the `Producer`. But I would like to move forward with KIP-572 for now and are happy to help to resolve those questions. {quote} was: In 2.7.0 release, producer config `retries` gets deprecated via KIP-572. Connect is still using this config what needs to be fixed. > Don't use deprecated producer config `retries` > -- > > Key: KAFKA-10297 > URL: https://issues.apache.org/jira/browse/KAFKA-10297 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.7.0 >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 2.7.0 > > > In 2.7.0 release, producer config `retries` gets deprecated via KIP-572. > Connect is still using this config what needs to be fixed (cf > [https://github.com/apache/kafka/pull/8864/files#r439685920]) > {quote}Btw: @hachikuji raise a concern about this issue, too: > https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531 > > I just had one question about the proposal. Using retries=0 in the producer > > allows the user to have "at-most-once" delivery. This allows the > > application to implement its own retry logic for example. Do we still have > > a way to do this once this configuration is gone? > So maybe we need to do some follow up work in the `Producer` to make it work > for Connect. But I would defer this to the follow up work. > My original though was, that setting `max.deliver.timeout.ms := request > .timeout.ms` might prevent internal retries. But we need to verify this. It > was also brought to my attention, that this might not work if the network > disconnects -- only `retries=0` would prevent to re-open the connection but a > low timeout would not prevent retries. > In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms = > 0` as "no retries" -- maybe we can do a similar thing for the producer? > There is also `max.block.ms` that we should consider. Unfortunately, I am not > an expert on the `Producer`. But I would like to move forward with KIP-572 > for now and are happy to help to resolve those questions. > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
mjsax commented on a change in pull request #8864: URL: https://github.com/apache/kafka/pull/8864#discussion_r458266432 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java ## @@ -148,6 +148,7 @@ public KafkaStatusBackingStore(Time time, Converter converter) { this.statusTopic = statusTopic; } +@SuppressWarnings("deprecation") Review comment: Btw: @hachikuji raise a concern about this issue, too: https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531 > I just had one question about the proposal. Using retries=0 in the producer allows the user to have "at-most-once" delivery. This allows the application to implement its own retry logic for example. Do we still have a way to do this once this configuration is gone? So maybe we need to do some follow up work in the `Producer` to make it work for Connect. But I would defer this to the follow up work. My original though was, that setting `max.deliver.timeout.ms := request .timeout.ms` might prevent internal retries. But we need to verify this. It was also brought to my attention, that this might not work if the network disconnects -- only `retries=0` would prevent to re-open the connection but a low timeout would not prevent retries. In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms = 0` as "no retries" -- maybe we can do a similar thing for the producer? There is also `max.block.ms` that we should consider. Unfortunately, I am not an expert on the `Producer`. But I would like to move forward with KIP-572 for now and are happy to help to resolve those questions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #8933: KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes)
rajinisivaram commented on pull request #8933: URL: https://github.com/apache/kafka/pull/8933#issuecomment-661986912 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-7516) Client (Producer and/or Consumer) crashes during initialization on Android
[ https://issues.apache.org/jira/browse/KAFKA-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162167#comment-17162167 ] Paulo César edited comment on KAFKA-7516 at 7/21/20, 4:48 PM: -- Have progress in this issue? was (Author: pccesar): Have progress in this issue? * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934] > Client (Producer and/or Consumer) crashes during initialization on Android > -- > > Key: KAFKA-7516 > URL: https://issues.apache.org/jira/browse/KAFKA-7516 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.0.0 >Reporter: alex kamenetsky >Priority: Major > > Attempt to incorporate kafka client (both Producer and Consumer) on Android > Dalvik fails during initialization stage: Dalvik doesn't support > javax.management (JMX). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7516) Client (Producer and/or Consumer) crashes during initialization on Android
[ https://issues.apache.org/jira/browse/KAFKA-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162167#comment-17162167 ] Paulo César commented on KAFKA-7516: Have progress in this issue? * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934] > Client (Producer and/or Consumer) crashes during initialization on Android > -- > > Key: KAFKA-7516 > URL: https://issues.apache.org/jira/browse/KAFKA-7516 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.0.0 >Reporter: alex kamenetsky >Priority: Major > > Attempt to incorporate kafka client (both Producer and Consumer) on Android > Dalvik fails during initialization stage: Dalvik doesn't support > javax.management (JMX). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
rajinisivaram commented on pull request #8768: URL: https://github.com/apache/kafka/pull/8768#issuecomment-661935552 There is one test failure in `kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota` that is related to the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458173651 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List topicPartitions = topicPartitionsForStore(store); -Map highWatermarks = null; -int attempts = 0; -while (highWatermarks == null) { -try { -highWatermarks = globalConsumer.endOffsets(topicPartitions); -} catch (final TimeoutException retryableException) { -if (++attempts > retries) { -log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", -store.name(), -retries, -retryableException); -throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), -retryableException); -} -log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", -topicPartitions, -retryBackoffMs, -attempts, -retries, -retryableException); -Utils.sleep(retryBackoffMs); -} +final Map highWatermarks; +try { +highWatermarks = globalConsumer.endOffsets(topicPartitions); +} catch (final TimeoutException retryableException) { +log.debug( +"Failed to get end offsets for partitions {}. The broker may be transiently unavailable at the moment. Will retry.", +topicPartitions, +retryableException Review comment: The varargs version of `debug` does _not_ take a cause at the end. This exception will not be logged. You have to use the version that only takes `(String, Exception)`. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List topicPartitions = topicPartitionsForStore(store); -Map highWatermarks = null; -int attempts = 0; -while (highWatermarks == null) { -try { -highWatermarks = globalConsumer.endOffsets(topicPartitions); -} catch (final TimeoutException retryableException) { -if (++attempts > retries) { -log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", -store.name(), -retries, -retryableException); -throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), -retryableException); -} -log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", -topicPartitions, -retryBackoffMs, -attempts, -retries, -retryableException); -Utils.sleep(retryBackoffMs); -} +final Map highWatermarks; +try { +highWatermarks = globalConsumer.endOffsets(topicPartitions); +} catch (final TimeoutException retryableException) { +log.debug( +"Failed to get end offsets for partitions {}. The broker may be transiently unavailable at the moment. Will retry.", +topicPartitions, +retryableException +); + +// handled in `GlobalStateMangerImpl#initialize()` +throw retryableException; Review comment: Minor note: it's confusing to track down exceptions when they are re-thrown like this, because the stacktrace would only reference L223. Even though the
[GitHub] [kafka] vvcephei commented on pull request #9047: KAFKA-9274: remove `retries` for global tasks
vvcephei commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-661924465 Test failures: ``` kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/core/build/reports/testOutput/kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs.test.stdout kafka.api.PlaintextAdminIntegrationTest > testAlterReplicaLogDirs FAILED org.scalatest.exceptions.TestFailedException: only 0 messages are produced within timeout after replica movement. Producer future Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 1 ms.)) at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs(PlaintextAdminIntegrationTest.scala:289) ``` ``` org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker.test.stdout org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest > testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker FAILED java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.reflections.Store.getAllIncluding(Store.java:82) at org.reflections.Store.getAll(Store.java:93) at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) at org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:253) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:137) at org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker(SourceConnectorsIntegrationTest.java:104) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #8954: MINOR; Move quota integration tests to using the new quota API.
rajinisivaram merged pull request #8954: URL: https://github.com/apache/kafka/pull/8954 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #8954: MINOR; Move quota integration tests to using the new quota API.
rajinisivaram commented on pull request #8954: URL: https://github.com/apache/kafka/pull/8954#issuecomment-661906846 Connect test failure unrelated, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on pull request #8957: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
urbandan commented on pull request #8957: URL: https://github.com/apache/kafka/pull/8957#issuecomment-661892966 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7025) Android client support
[ https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162049#comment-17162049 ] Ismael Juma commented on KAFKA-7025: [~Pccesar] What Android API do you provide support for? The version distribution in May looked like: !image-2020-07-21-06-42-03-140.png! [Source: https://www.androidauthority.com/android-version-distribution-748439/|https://www.androidauthority.com/android-version-distribution-748439/] Usage for versions older than Oreo is decreasing, as expected, but they still account for slightly under 40% of users. > Android client support > -- > > Key: KAFKA-7025 > URL: https://issues.apache.org/jira/browse/KAFKA-7025 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Martin Vysny >Priority: Major > Attachments: image-2020-07-21-06-42-03-140.png > > > Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would > make the compilation fail: com.android.tools.r8.ApiLevelException: > MethodHandle.invoke and MethodHandle.invokeExact are only supported starting > with Android O (--min-api 26) > > Would it be possible to make the kafka-clients backward compatible with > reasonable Android API (say, 4.4.x) please? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7025) Android client support
[ https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7025: --- Attachment: image-2020-07-21-06-42-03-140.png > Android client support > -- > > Key: KAFKA-7025 > URL: https://issues.apache.org/jira/browse/KAFKA-7025 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Martin Vysny >Priority: Major > Attachments: image-2020-07-21-06-42-03-140.png > > > Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would > make the compilation fail: com.android.tools.r8.ApiLevelException: > MethodHandle.invoke and MethodHandle.invokeExact are only supported starting > with Android O (--min-api 26) > > Would it be possible to make the kafka-clients backward compatible with > reasonable Android API (say, 4.4.x) please? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9026: KAFKA-10274; Consistent timeouts in transactions_test
ijuma commented on pull request #9026: URL: https://github.com/apache/kafka/pull/9026#issuecomment-661865679 Is this ready to be merged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-661862318 @junrao Could you run system tests on both trunk and this PR? the tests which fail frequently on my local are shown below. 1. transactions_test.py #9026 1. group_mode_transactions_test.py #9026 1. security_rolling_upgrade_test.py #9021 1. streams_standby_replica_test.py (I'm digging it) Also, I have filed tickets for other failed system tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
serjchebotarev commented on a change in pull request #9028: URL: https://github.com/apache/kafka/pull/9028#discussion_r458064223 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ## @@ -68,52 +92,224 @@ public void after() throws Exception { } @Test -public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { - super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(); +public void shouldNotAllowToResetWhileStreamsIsRunning() { +final String appID = getTestId() + "-not-reset-during-runtime"; Review comment: Done! :smile: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
omkreddy commented on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-661829854 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #8968: KAFKA-10164; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part II, Admin Changes)
dajac commented on pull request #8968: URL: https://github.com/apache/kafka/pull/8968#issuecomment-661828919 @rajinisivaram Thanks for the review. I have updated the PR to address your feedback. Regarding the test case that you suggested, I have added test cases but I am 100% sure that they cover what you had in mind. Please, let me know if I have misunderstood your suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8968: KAFKA-10164; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part II, Admin Changes)
dajac commented on a change in pull request #8968: URL: https://github.com/apache/kafka/pull/8968#discussion_r458059973 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java ## @@ -30,6 +30,7 @@ public class CreatePartitionsOptions extends AbstractOptions { private boolean validateOnly = false; +private boolean retryQuotaViolatedException = true; Review comment: `retryOnQuotaViolation` sounds good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7025) Android client support
[ https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161978#comment-17161978 ] Paulo César commented on KAFKA-7025: Have progress in this issue? > Android client support > -- > > Key: KAFKA-7025 > URL: https://issues.apache.org/jira/browse/KAFKA-7025 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Martin Vysny >Priority: Major > > Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would > make the compilation fail: com.android.tools.r8.ApiLevelException: > MethodHandle.invoke and MethodHandle.invokeExact are only supported starting > with Android O (--min-api 26) > > Would it be possible to make the kafka-clients backward compatible with > reasonable Android API (say, 4.4.x) please? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-7025) Android client support
[ https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paulo César updated KAFKA-7025: --- Comment: was deleted (was: Have progress in this issue? * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934]) > Android client support > -- > > Key: KAFKA-7025 > URL: https://issues.apache.org/jira/browse/KAFKA-7025 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Martin Vysny >Priority: Major > > Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would > make the compilation fail: com.android.tools.r8.ApiLevelException: > MethodHandle.invoke and MethodHandle.invokeExact are only supported starting > with Android O (--min-api 26) > > Would it be possible to make the kafka-clients backward compatible with > reasonable Android API (say, 4.4.x) please? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7025) Android client support
[ https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161979#comment-17161979 ] Paulo César commented on KAFKA-7025: Have progress in this issue? * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934] > Android client support > -- > > Key: KAFKA-7025 > URL: https://issues.apache.org/jira/browse/KAFKA-7025 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Martin Vysny >Priority: Major > > Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would > make the compilation fail: com.android.tools.r8.ApiLevelException: > MethodHandle.invoke and MethodHandle.invokeExact are only supported starting > with Android O (--min-api 26) > > Would it be possible to make the kafka-clients backward compatible with > reasonable Android API (say, 4.4.x) please? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy closed pull request #9046: KAFKA-9432:(follow-up) Set `configKeys` to null in `describeConfigs()` to make it backward compatible with older Kafka versions.
omkreddy closed pull request #9046: URL: https://github.com/apache/kafka/pull/9046 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #9046: KAFKA-9432:(follow-up) Set `configKeys` to null in `describeConfigs()` to make it backward compatible with older Kafka versions.
omkreddy commented on pull request #9046: URL: https://github.com/apache/kafka/pull/9046#issuecomment-661811555 @rajinisivaram Thanks for the review. Merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #8968: KAFKA-10164; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part II, Admin Changes)
rajinisivaram commented on a change in pull request #8968: URL: https://github.com/apache/kafka/pull/8968#discussion_r457953384 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java ## @@ -48,4 +49,21 @@ public CreatePartitionsOptions validateOnly(boolean validateOnly) { this.validateOnly = validateOnly; return this; } -} \ No newline at end of file + +/** + * Set the retry QuotaViolatedException to indicate whether QuotaViolatedException Review comment: Several comments refer to QuotaViolatedException which is not an actual exception ## File path: clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java ## @@ -30,6 +30,7 @@ public class CreatePartitionsOptions extends AbstractOptions { private boolean validateOnly = false; +private boolean retryQuotaViolatedException = true; Review comment: Since the exception being retried is called ThrottlingQuotaExceededException, maybe this should say `retryOnQuotaViolation` or something like that without referring to the exception? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r457821195 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ## @@ -177,15 +214,33 @@ public static ApiVersionsResponse createApiVersionsResponse( } } +return new ApiVersionsResponse( +createApiVersionsResponseData( +throttleTimeMs, +Errors.NONE, +apiKeys, +latestSupportedFeatures, +finalizedFeatures, +finalizedFeaturesEpoch)); +} + +public static ApiVersionsResponseData createApiVersionsResponseData( Review comment: It calls into couple other helper functions. Let us keep it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r457986837 ## File path: core/src/test/scala/unit/kafka/server/UpdateFinalizedFeaturesTest.scala ## @@ -0,0 +1,450 @@ +package kafka.server Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r457809543 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the DescribeFeaturesResult containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + +/** + * Applies specified updates to finalized features. The API is atomic, meaning that if a single + * feature update in the request can't succeed on the controller, then none of the feature + * updates are carried out. This request is issued only to the controller since the API is + * only served by the controller. + * + * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such + * update specifies the finalized feature to be added or updated or deleted, along with the new + * max feature version level value. + * + * Downgrade of feature version level is not a regular operation/intent. It is only allowed + * in the controller if the feature update has the allowDowngrade flag set - setting this flag + * conveys user intent to attempt downgrade of a feature max version level. Note that despite + * the allowDowngrade flag being set, certain downgrades may be rejected by the controller if it + * is deemed impossible. + * Deletion of a finalized feature version is not a regular operation/intent. It is allowed + * only if the allowDowngrade flag is set in the feature update, and, if the max version level + * is set to a value less than 1. + * + * + * The following exceptions can be anticipated when calling {@code get()} on the futures + * obtained from the returned {@link UpdateFinalizedFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * If the authenticated user didn't have alter access to the cluster. + * {@link org.apache.kafka.common.errors.InvalidRequestException} + * If the request details are invalid. e.g., a non-existing finalized feature is attempted + * to be deleted or downgraded. + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the updates could finish. It cannot be guaranteed whether + * the updates succeeded or not. + * {@link org.apache.kafka.common.errors.FinalizedFeatureUpdateFailedException} + * If the updates could not be applied on the controller, despite the request being valid. + * This may be a temporary problem. + * + * + * This operation is supported by brokers with version 2.7.0 or higher. + + * @param featureUpdates the set of finalized feature updates + * @param options the options to use + * + * @return the UpdateFinalizedFeaturesResult containing the result + */ +UpdateFinalizedFeaturesResult updateFinalizedFeatures( Review comment: Done. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; +import org.apache.kafka.common.feature.Features; +import org.apache.kafka.common.feature.FinalizedVersionRange; +import org.apache.kafka.common.feature.Supported
[GitHub] [kafka] rajinisivaram commented on pull request #8933: KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes)
rajinisivaram commented on pull request #8933: URL: https://github.com/apache/kafka/pull/8933#issuecomment-661761899 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10279) Allow dynamic update of certificates with additional SubjectAltNames
[ https://issues.apache.org/jira/browse/KAFKA-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-10279. Reviewer: Manikumar Resolution: Fixed > Allow dynamic update of certificates with additional SubjectAltNames > > > Key: KAFKA-10279 > URL: https://issues.apache.org/jira/browse/KAFKA-10279 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > At the moment, we don't allow dynamic keystore update in brokers if DN and > SubjectAltNames don't match exactly. This is to ensure that existing clients > and inter-broker communication don't break. Since addition of new entries to > SubjectAltNames will not break any authentication, we should allow that and > just verify that new SubjectAltNames is a superset of the old one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on pull request #9044: KAFKA-10279; Allow dynamic update of certificates with additional SubjectAltNames
rajinisivaram commented on pull request #9044: URL: https://github.com/apache/kafka/pull/9044#issuecomment-661749475 @omkreddy Thanks for the review, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #9044: KAFKA-10279; Allow dynamic update of certificates with additional SubjectAltNames
rajinisivaram merged pull request #9044: URL: https://github.com/apache/kafka/pull/9044 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
rajinisivaram commented on pull request #8768: URL: https://github.com/apache/kafka/pull/8768#issuecomment-661729947 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
rajinisivaram commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r457944837 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java ## @@ -97,5 +99,13 @@ public MetricConfig recordLevel(Sensor.RecordingLevel recordingLevel) { return this; } +public boolean skipReporting() { Review comment: If we are using a sensor to determine throttle time that is different from the one in Selector, we might want to expose it as a metric anyway. In case of a bug, we want to know this metric, not one in Selector. Perhaps we could use `connections-accepted` instead of `connections-created` or something like that. In any case, `skipReporting` seems odd, so as @dajac said, using a Sensor that is not added to the metrics registry may be an option too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"
[ https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161857#comment-17161857 ] bright.zhou commented on KAFKA-9724: When merge this to Kafka 2.4.x? Have any plan? > Consumer wrongly ignores fetched records "since it no longer has valid > position" > > > Key: KAFKA-9724 > URL: https://issues.apache.org/jira/browse/KAFKA-9724 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.4.0 >Reporter: Oleg Muravskiy >Assignee: David Arthur >Priority: Major > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer.log.xz > > > After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) > consumers in a consumer group intermittently stop progressing on assigned > partitions, even when there are messages to consume. This is not a permanent > condition, as they progress from time to time, but become slower with time, > and catch up after restart. > Here is a sample of 3 consecutive ignored fetches: > {noformat} > 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned > fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = > 538065631, logStartOffset = 485284547, preferredReadReplica = absent, > abortedTransactions = null, recordsSizeInBytes=16380) > 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Ignoring fetched records for partition mrt-rrc10-6 since it no longer has > valid position > 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position > FetchPosition{offset=538065584, offsetEpoch=Optional[62], > currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), > epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null) > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), > implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 > (id: 3 rack: null) > 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned > fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = > 538065727, logStartOffset = 485284547, preferredReadReplica = absent, > abortedTransactions = null, recordsSizeInBytes=51864) > 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Ignoring fetched records for partition mrt-rrc10-6 since it no longer has > valid position > 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > {noformat} > After which consumer makes progress: > {noformat} > 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc1
[GitHub] [kafka] mjsax opened a new pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax opened a new pull request #9047: URL: https://github.com/apache/kafka/pull/9047 - part of KIP-572 - removed the usage of `retries` in `GlobalStateManger` - instead of retries the new `task.timeout.ms` config is used Second PR for KIP-572 (cf. #8864) Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r457889242 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -523,6 +524,8 @@ public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem."; +public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms"; Review comment: This is just added until the first PR is merged to unblock the work on this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161796#comment-17161796 ] Matthias J. Sax commented on KAFKA-8037: I believe that the optimization is still worth to have. That the input topic is log-compacted is not a crazy assumption IMHO. And in fact, we have this optimization for a long time and I am not aware of any data lost issues due to incorrect retention configuration? I am also less concerned about Consumered/Materialized. Often user might specify the same serde in both places but it should be possible to detect this case. Hence, instead of a blind check if to we get one or two serdes, we should check if we got the same or different serdes. Similar for serialization exception handling: if the default one is used, we _know_ that no corrupted data can be in the store and thus it seems safe to just do blind bulk loading – what I obviously don't know is how may people might use the default? I also strongly believe that async serdes should be declared and anti-pattern. Instead of trying to make it work, it might be more reasonable to tell people that it's a bad idea and not supported. In general, I think it makes sense to allow users to opt-in/opt-out on a per table basis via the API. If we need a `useSourceAsChangelog` I am not sure. We could also educate people to use `builder.stream().toTable()` – if possible, it seems desirable to not increase the API if we don't need to? For GlobalKTables or actually global-state-store we might want to reconsider the design: note that for GlobalKTable not intermediate processing happens. Only for global-state-store people can plug-in a custom Processor (which is currently not allowed to do any actually processing). Having an extra changelog topic to begin with does not really make sense (in contrast to the KTable case). However, we could make the API simpler: instead of letting users define the "store maintainer Processor" we would always provide it (including for the PAPI) – to allow users to preprocess the data, we allow them the plug-in a processor between the source and the store-maintainer instead. In the DSL, we never plug-in an intermediate processor but only have source+storeMaintainer – if we detect this pattern, we know that the data was not modified and we can do some optimizations during state restore. For all other cases, we need to do the restore as regular processing as suggested by Guozhang. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-661675952 Thanks @ning2008wisc ! @mimaison , do you have any comments for this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8885: KAFKA-8264: decrease the record size for flaky test
showuon commented on pull request #8885: URL: https://github.com/apache/kafka/pull/8885#issuecomment-661675572 hi @omkreddy , looks like @hachikuji is not available recently. Do you think we still need other people's comment? I think this change should be pretty straightforward and safe. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org