[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -414,34 +414,51 @@ private T getConfiguredInstance(Object klass, Class t, Map T getConfiguredInstance(String key, Class t) { -return getConfiguredInstance(key, t, Collections.emptyMap()); +T configuredInstance = null; + +try { +configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap()); +} catch (Exception e) { +maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); +throw e; +} + +return configuredInstance; } /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. * - * @param key The configuration key for the class - * @param t The interface the class should implement + * @param key The configuration key for the class + * @param t The interface the class should implement * @param configOverrides override origin configs * @return A configured instance of the class */ public T getConfiguredInstance(String key, Class t, Map configOverrides) { Class c = getClass(key); +T configuredInstance = null; -return getConfiguredInstance(c, t, originals(configOverrides)); +try { +configuredInstance = getConfiguredInstance(c, t, originals(configOverrides)); +} catch (Exception e) { Review Comment: I went ahead and extended the try/catch to enclose the `o = Utils.newInstance((String) klass, t);` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -414,34 +414,51 @@ private T getConfiguredInstance(Object klass, Class t, Map T getConfiguredInstance(String key, Class t) { -return getConfiguredInstance(key, t, Collections.emptyMap()); +T configuredInstance = null; + +try { +configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap()); +} catch (Exception e) { +maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); +throw e; +} + +return configuredInstance; } /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. * - * @param key The configuration key for the class - * @param t The interface the class should implement + * @param key The configuration key for the class + * @param t The interface the class should implement * @param configOverrides override origin configs * @return A configured instance of the class */ public T getConfiguredInstance(String key, Class t, Map configOverrides) { Class c = getClass(key); +T configuredInstance = null; -return getConfiguredInstance(c, t, originals(configOverrides)); +try { +configuredInstance = getConfiguredInstance(c, t, originals(configOverrides)); +} catch (Exception e) { Review Comment: I've updated accordingly. However, this assumes no resource leakage within the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-14663, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1411301405 After #13181 is merged, I'll rebase and remove my fairness patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method
gharris1727 commented on code in PR #13181: URL: https://github.com/apache/kafka/pull/13181#discussion_r1092643889 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -87,6 +90,7 @@ public MirrorSourceTask() {} @Override public void start(Map props) { MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props); +pendingOffsetSyncs.clear(); Review Comment: nit: findbugs doesn't complain about this not being synchronized? you must be on better terms with it than I am. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -194,41 +200,63 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); -maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); +maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset); +// We may be able to immediately publish an offset sync that we've queued up here +firePendingOffsetSyncs(); } -// updates partition state and sends OffsetSync if necessary -private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset, -long downstreamOffset) { +// updates partition state and queues up OffsetSync if necessary +private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, + long downstreamOffset) { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); if (partitionState.update(upstreamOffset, downstreamOffset)) { -if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { -partitionState.reset(); +OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); +synchronized (this) { +pendingOffsetSyncs.put(topicPartition, offsetSync); } +partitionState.reset(); } } -// sends OffsetSync record upstream to internal offsets topic -private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, -long downstreamOffset) { -if (!outstandingOffsetSyncs.tryAcquire()) { -// Too many outstanding offset syncs. -return false; +private void firePendingOffsetSyncs() { +while (true) { +OffsetSync pendingOffsetSync; +synchronized (this) { +Iterator syncIterator = pendingOffsetSyncs.values().iterator(); +if (!syncIterator.hasNext()) { +// Nothing to sync +log.debug("No more pending offset syncs"); Review Comment: nit: these log statements could be spammy if they're called on every commitRecord, wdyt about `trace`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
C0urante commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092632585 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -414,34 +414,51 @@ private T getConfiguredInstance(Object klass, Class t, Map T getConfiguredInstance(String key, Class t) { -return getConfiguredInstance(key, t, Collections.emptyMap()); +T configuredInstance = null; + +try { +configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap()); +} catch (Exception e) { +maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); +throw e; +} + +return configuredInstance; } /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. * - * @param key The configuration key for the class - * @param t The interface the class should implement + * @param key The configuration key for the class + * @param t The interface the class should implement * @param configOverrides override origin configs * @return A configured instance of the class */ public T getConfiguredInstance(String key, Class t, Map configOverrides) { Class c = getClass(key); +T configuredInstance = null; -return getConfiguredInstance(c, t, originals(configOverrides)); +try { +configuredInstance = getConfiguredInstance(c, t, originals(configOverrides)); +} catch (Exception e) { Review Comment: If we do the try/catch here, it doesn't work; `configuredInstance` is guaranteed to be null in the catch block. I was thinking of something like this: ```java if (o instanceof Configurable) { try { ((Configurable) o).configure(configPairs); } catch (Exception e) { maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); throw e; } } ``` being added to [these lines](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L406-L407) in the other variant of `getConfiguredInstance`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method
C0urante opened a new pull request, #13181: URL: https://github.com/apache/kafka/pull/13181 The [Jira](https://issues.apache.org/jira/browse/KAFKA-14610) contains a more detailed description of the motivation for this change. To summarize, if there are bursty topic partitions, offset sync starvation can occur since MM2 limits the number of in-flight syncs at one time. The fix here is to enqueue the latest applicable offset for any to-be-synced topic partition in the `commitRecord` method, immediately fire off all syncs that can be published in that method, and then periodically re-check for new syncs in `commit` in case there are syncs that could not be published due to, e.g., a burst of throughput on a topic partition. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater
guozhangwang commented on code in PR #13025: URL: https://github.com/apache/kafka/pull/13025#discussion_r1092598270 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java: ## @@ -190,7 +190,7 @@ public void clearTaskTimeout() { @Override public boolean commitNeeded() { -throw new UnsupportedOperationException("This task is read-only"); +return task.commitNeeded(); Review Comment: Why do we need to change these two functions? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -258,21 +269,27 @@ private List getTasksAndActions() { } private void addTask(final Task task) { +final TaskId taskId = task.id(); if (isStateless(task)) { addToRestoredTasks((StreamTask) task); -log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); +log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater"); +} else if (topologyMetadata.isPaused(taskId.topologyName())) { +pausedTasks.put(taskId, task); Review Comment: I'm wondering if this complexity is necessary, since we do not make strict ordering guarantees for paused topologies -- i.e. it's okay to still processing those tasks for a while after the `pause()` call is triggered. Is it really a correctness or concurrency issue? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1534,7 +1540,13 @@ Set standbyTaskIds() { Map allTasks() { // not bothering with an unmodifiable map, since the tasks themselves are mutable, but // if any outside code modifies the map or the tasks, it would be a severe transgression. -return tasks.allTasksPerId(); +if (stateUpdater != null) { +final Map ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x)); +ret.putAll(tasks.allTasksPerId()); Review Comment: I've changed the func name slightly in another PR, so if that PR is merged we need to do a slight rebase/conflict resolution, just FYI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775 ] Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:38 PM: -- Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It will have protocol changes to include the broker epoch in the AlterPartition and Fetch requests. Will share more details when the KIP is published. Sorry I did not assign the ticket to me earlier. Can you assign the ticket to me? was (Author: JIRAUSER298384): Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It will have protocol changes to include the broker epoch in the AlterPartition and Fetch requests. Will share more details when the KIP is published. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775 ] Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:13 PM: -- Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It will have protocol changes to include the broker epoch in the AlterPartition and Fetch requests. Will share more details when the KIP is published. was (Author: JIRAUSER298384): Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. Including the broker epoch in the AlterPartition and Fetch request is preferable. Will share more details when the KIP is published. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775 ] Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:01 PM: -- Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. Including the broker epoch in the AlterPartition and Fetch request is preferable. Will share more details when the KIP is published. was (Author: JIRAUSER298384): Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It is almost done, will keep you posted. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775 ] Calvin Liu commented on KAFKA-14139: Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP for this one for a while. It is almost done, will keep you posted. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vvcephei merged pull request #12879: KAFKA-14409: Clean ProcessorParameters from casting
vvcephei merged PR #12879: URL: https://github.com/apache/kafka/pull/12879 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #12879: KAFKA-14409: Clean ProcessorParameters from casting
vvcephei commented on PR #12879: URL: https://github.com/apache/kafka/pull/12879#issuecomment-1411120341 There was only one test failure, which was in an unrelated component: `[Build / JDK 8 and Scala 2.12 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12879/4/testReport/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_8_and_Scala_2_12___testTrustStoreAlter_String__quorum_kraft/)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #13180: Add a summarizer for the metadata migration
mumrah opened a new pull request, #13180: URL: https://github.com/apache/kafka/pull/13180 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-146347 Thanks @fvaleri , I made the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092516181 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -168,6 +169,7 @@ public class KafkaConsumerTest { private final int defaultApiTimeoutMs = 6; private final int requestTimeoutMs = defaultApiTimeoutMs / 2; private final int heartbeatIntervalMs = 1000; +private final int targetInterceptor = 3; Review Comment: Just an observation, I see the following variable is used in only one test case `private final int throttleMs = 10;` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092513735 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR, org.apache.kafka.test.MockProducerInterceptor.class); Review Comment: @C0urante Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14487) Move LogManager to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682769#comment-17682769 ] Sagar Rao edited comment on KAFKA-14487 at 1/31/23 9:22 PM: I started looking at this. It depends on https://issues.apache.org/jira/browse/KAFKA-14603. There's also a Pool.scala class which needs to be migrated which seems to be used in multiple places? I am not aware if a java equivalent class of it exists. Should we look to migrate this class to Java? was (Author: sagarrao): I started looking at this. It depends on https://issues.apache.org/jira/browse/KAFKA-14603. There's also a Pool.scala class which needs to be migrated which seems to be used in multiple places? I am not aware if a java equivalent class of it exists. I can start migrating that first if needed. > Move LogManager to storage module > - > > Key: KAFKA-14487 > URL: https://issues.apache.org/jira/browse/KAFKA-14487 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Sagar Rao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14487) Move LogManager to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682769#comment-17682769 ] Sagar Rao commented on KAFKA-14487: --- I started looking at this. It depends on https://issues.apache.org/jira/browse/KAFKA-14603. There's also a Pool.scala class which needs to be migrated which seems to be used in multiple places? I am not aware if a java equivalent class of it exists. I can start migrating that first if needed. > Move LogManager to storage module > - > > Key: KAFKA-14487 > URL: https://issues.apache.org/jira/browse/KAFKA-14487 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Sagar Rao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1411037660 I just updated AlterUserScramCredentialsRequestTest to work in KRaft after finding it while looking into the tests you wanted. It covers a lot of what you want testing ScramControlManager and ControllerApis given that it is an end to end test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1092446989 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private val listenBacklogSize = config.socketListenBacklogSize private val nioSelector = NSelector.open() - private[network] val serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize) + + // If the port is configured as 0, we are using a random (ephemeral) port, so we need to open + // the socket before we can find out what port we have. If it is set to a nonzero value, defer + // opening the socket until we start the Acceptor. The reason for deferring the socket opening + // is so that systems which assume that the socket being open indicates readiness are not + // confused. + private[network] var serverChannel: ServerSocketChannel = _ + private[network] val localPort: Int = if (endPoint.port != 0) { Review Comment: good idea. let me wait for this test run to complete then I'll add a log message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1092445406 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() private var started = false - private[network] val startFuture = new CompletableFuture[Void]() + private[network] val startedFuture = new CompletableFuture[Void]() val thread = KafkaThread.nonDaemon( s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", this) - startFuture.thenRun(() => synchronized { -if (!shouldRun.get()) { - debug(s"Ignoring start future for ${endPoint.listenerName} since the acceptor has already been shut down.") -} else { + def start(): Unit = synchronized { +try { + if (!shouldRun.get()) { +throw new ClosedChannelException() Review Comment: yeah, that's right. For example, if there was some error during broker startup, we would call shutdown and potentially shut down the SocketServer before the Acceptors had been started. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
mumrah commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1092435602 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() private var started = false - private[network] val startFuture = new CompletableFuture[Void]() + private[network] val startedFuture = new CompletableFuture[Void]() val thread = KafkaThread.nonDaemon( s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", this) - startFuture.thenRun(() => synchronized { -if (!shouldRun.get()) { - debug(s"Ignoring start future for ${endPoint.listenerName} since the acceptor has already been shut down.") -} else { + def start(): Unit = synchronized { +try { + if (!shouldRun.get()) { +throw new ClosedChannelException() Review Comment: How does this happen? If the kafka server is shutdown before the socket server is able to finish starting? ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private val listenBacklogSize = config.socketListenBacklogSize private val nioSelector = NSelector.open() - private[network] val serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize) + + // If the port is configured as 0, we are using a random (ephemeral) port, so we need to open + // the socket before we can find out what port we have. If it is set to a nonzero value, defer + // opening the socket until we start the Acceptor. The reason for deferring the socket opening + // is so that systems which assume that the socket being open indicates readiness are not + // confused. + private[network] var serverChannel: ServerSocketChannel = _ + private[network] val localPort: Int = if (endPoint.port != 0) { Review Comment: It might be good to log when we are opening the socket server early. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang commented on PR #12654: URL: https://github.com/apache/kafka/pull/12654#issuecomment-1410999341 @mjsax tried to resolve the long commit history but it seems not possible, so I created a new PR (with the same branch name) here: https://github.com/apache/kafka/pull/13179 it's ready for another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request, #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang opened a new pull request, #13179: URL: https://github.com/apache/kafka/pull/13179 1. Add the new API (default impl is empty) to StateRestoreListener. 2. Update related unit tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang closed pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang closed pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener URL: https://github.com/apache/kafka/pull/12654 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
C0urante commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1092418826 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -711,9 +752,32 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } -private void sendPrivileged(String key, byte[] value) { +/** + * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param key the record key + * @param value the record value + */ +private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException, TimeoutException { +sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value))); +} + +/** + * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param keyValues the list of producer record key/value pairs + */ +private void sendPrivileged(List keyValues) throws ExecutionException, InterruptedException, TimeoutException { if (!usesFencableWriter) { -configLog.send(key, value); +List> producerFutures = new ArrayList<>(); +keyValues.forEach( +keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) +); + +for (Future future : producerFutures) { +future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS); Review Comment: This gives us 30 seconds for each write instead of 30 seconds total, and it doesn't take into account reading to the end of the log after writes have finished. Considering this is all taking place on the herder's tick thread, we should probably care about the difference. We might be able to use the [Timer class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java) to simplify some of this logic. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Oof, that's a lot of context 😄 Thinking about it some more, I'm hesitant to make significant changes to the exception mapper without a KIP since it's a crucial part of our API and there may be automated tooling (like K8s operators) built around the current behavior, and adding full stack traces and/or caused-by chains could make things less readable and human-friendly, especially for new users. Asking people to check the worker logs isn't a terrible solution, though it might be a bit tricky to make sure that that message reaches users regardless of whether EOS source support is enabled or disabled. An alternative could be to handle this case specially by instantiating an exception whose message contains information on its cause. For example, the message could be `"Failed to write task configs to Kafka. Caused by org.apache.kafka.common.errors.AuthorizationException: Not authorized to access topics: connect-configs"`. Regardless, this can and probably should be left as a follow-up since it's its own can of worms and doesn't have to block this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow
Greg Harris created KAFKA-14666: --- Summary: MM2 should translate consumer group offsets behind replication flow Key: KAFKA-14666 URL: https://issues.apache.org/jira/browse/KAFKA-14666 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 3.5.0 Reporter: Greg Harris MirrorMaker2 includes an offset translation feature which can translate the offsets for an upstream consumer group to a corresponding downstream consumer group. It does this by keeping a topic of offset-syncs to correlate upstream and downstream offsets, and translates any source offsets which are ahead of the replication flow. However, if a replication flow is closer to the end of a topic than the consumer group, then the offset translation feature will refuse to translate the offset for correctness reasons. This is because the MirrorCheckpointTask only keeps the latest offset correlation between source and target, it does not have sufficient information to translate older offsets. The workarounds for this issue are to: 1. Pause the replication flow occasionally to allow the source to get ahead of MM2 2. Increase the offset.lag.max to delay offset syncs, increasing the window for translation to happen. With the fix for KAFKA-12468, this will also increase the lag of applications that are ahead of the replication flow, so this is a tradeoff. Instead, the MirrorCheckpointTask should provide correct and best-effort translation for consumer groups behind the replication flow by keeping additional state, or re-reading the offset-syncs topic. This should be a substantial improvement for use-cases where applications have a higher latency to commit than the replication flow, or where applications are reading from the earliest offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang commented on PR #12654: URL: https://github.com/apache/kafka/pull/12654#issuecomment-1410947553 > Why has this PR 392 commits? I created this branch / PR around Oct. 2022, and then I rebased on top of trunk. My local branch is in right place but it seems github cannot recognize the rebasing. That should be okay since when merging it would squash into a single commit anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang commented on code in PR #12654: URL: https://github.com/apache/kafka/pull/12654#discussion_r1092384524 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() { () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); } +@Test +public void shouldSupportUnregisterChangelogBeforeCompletion() { Review Comment: I can add that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang commented on code in PR #12654: URL: https://github.com/apache/kafka/pull/12654#discussion_r1092383808 ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +37,9 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: Ack. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -988,6 +988,18 @@ public void unregister(final Collection revokedChangelogs) { if (changelogMetadata != null) { if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); + +// if the changelog is still in REGISTERED, it means it has not initialized and started +// restoring yet, and hence the corresponding onRestoreStart was not called; in this case +// we should not call onRestorePaused either Review Comment: Ouch, Ack! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12468) Initial offsets are copied from source to target cluster
[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-12468: --- Assignee: Greg Harris > Initial offsets are copied from source to target cluster > > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bart De Neuter >Assignee: Greg Harris >Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Checkpoint connector: > {code:xml} > { > "name": "mm2-mirror-checkpoint", > "config": { > "name": "mm2-mirror-checkpoint", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Source connector: > {code:xml} > { > "name": "mm2-mirror-source", > "config": { > "name": "mm2-mirror-source", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-12566: --- Assignee: Greg Harris (was: Luke Chen) > Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication > - > > Key: KAFKA-12566 > URL: https://issues.apache.org/jira/browse/KAFKA-12566 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Assignee: Greg Harris >Priority: Critical > Labels: flaky-test > > > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 2. > Offsets not translated downstream to primary cluster. ==> expected: > but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289) > {code} > {{LOGs}} > {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote} > and > {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] > Graceful stop of task MirrorHeartbeatConnector-0 failed. > (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR > [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR > [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out > while waiting for producer to flush outstanding 1 messages > (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682707#comment-17682707 ] Greg Harris commented on KAFKA-12566: - Hey [~showuon] I'm going to assign this issue to me, as I had to do some test stabilization in this area as part of [https://github.com/apache/kafka/pull/13178] . > Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication > - > > Key: KAFKA-12566 > URL: https://issues.apache.org/jira/browse/KAFKA-12566 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 2. > Offsets not translated downstream to primary cluster. ==> expected: > but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289) > {code} > {{LOGs}} > {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote} > and > {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] > Graceful stop of task MirrorHeartbeatConnector-0 failed. > (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR > [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR > [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out > while waiting for producer to flush outstanding 1 messages > (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #13167: KAFKA-14650: Synchronize access to tasks inside task manager
guozhangwang commented on PR #13167: URL: https://github.com/apache/kafka/pull/13167#issuecomment-1410914374 @lucasbru You're right! I think this issue exists even before #12397. After thinking that a bit, along with forward looking that IQ would need to access the Tasks plus the StateUpdater (which is already synchronized) I'll just synchronize on the access of the two maps instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13178: KAFKA-12468, KAFKA-14663, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 opened a new pull request, #13178: URL: https://github.com/apache/kafka/pull/13178 This PR addresses three distinct but closely related issues: 1. [KAFKA-12468](https://issues.apache.org/jira/browse/KAFKA-12468) "Initial offsets are copied from source to target cluster" "Mirror Maker 2 Negative Offsets" 2. [KAFKA-14663](https://issues.apache.org/jira/browse/KAFKA-14663) "High throughput topics can starve low-throughput MM2 offset syncs" 3. [KAFKA-12566](https://issues.apache.org/jira/browse/KAFKA-12566) "Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication" The primary issue being addressed here is the incorrect translation of offsets, the title issue. The [MM2 KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0) does not discuss the offset translation mechanism in detail, so I'll summarize the mechanism as it currently exists on trunk: 1. Records are mirrored from source topic-partition to target topic-partition by the MirrorSourceTask 2. MirrorSourceTask will (occasionally) emit OffsetSync messages to an Offset Syncs topic. Offset syncs contain the upstream and downstream offset of an emitted data record. 3. The MirrorCheckpointTask will consume from the offset syncs topic, and maintain an in-memory copy of the latest offset sync seen for each topic-partition (in OffsetSyncStore) 4. Periodically the MirrorCheckpointTask will poll consumer group offsets for the source topic, and use it's in-memory copy of the latest offset sync to translate upstream offsets to downstream offsets. 5. This is done by measuring the 'distance' between the MM2 offset sync and the upstream consumer group, and then assuming that the same distance applies in the downstream topic. Step (5) is correct when assuming that every *offset* from the source topic has already been reproduced in the downstream topic. However, this assumption is violated when offsets are not present, which can happen for a variety of reasons, including: 1. Transaction markers take an offset but will never be emitted as records from the consumer 2. Records are dropped by SMTs and will never be emitted to the target topic 3. The source topic has been compacted and some offsets will never be emitted by the consumer 4. MM2 replication is lagging behind an upstream consumer group and some records have not been replicated yet In any of these conditions, an upstream offset may be translated to a downstream offset which is beyond the corresponding record in the downstream topic. Consider the following concrete example of situation (4) **resulting in negative lag**: 1. Source topic `A` has 1000 records, all with contiguous offsets 2. An upstream consumer group `cg` is at the end of the log, offset 1000. 3. MM2 begins replicating the topic, and writes 500 upstream records to the target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with (`target.A`, 500). 4. MM2 checkpoint reads `cg` offset 1000, translates the offset to 500 + (1000-500) = 1000, and writes to `target.cg` 5. Someone checks the `target.cg` offset for `target.A` and observes that the group offset is 1000, the topic end offset is 500, and the lag is -500. And the following concrete example of situation (1) **resulting in undelivered data**. 1. Source topic `A` has 1000 records, all emitted with a transactional producer. 2. The 1000 records are interleaved with 1000 commit markers at every other offset. 3. An upstream consumer group `cg` is in the middle of the topic, at offset 1000. 4. MM2 begins replicating the topic, and writes 1000 records to the target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with (`target.A`, 250), in addition to other offset-syncs. 5. MM2 checkpoint reads the `cg` offset 1000, translates the offset to 250 + (1000 - 500) = 750, and writes to `target.cg` 6. A system fails-over from `cg` to `target.cg` and someone notices that the `cg` application read records 0-500, `target.cg` application read 750-1000, but no consumer ever received offsets 500-750. This PR adds a test that replicates transactional data, as in situation (1). It asserts that whenever an offset is translated, it does not pass the end of the downstream topic, and cannot cause negative lag. In addition the tests are strengthened to require the offset syncs to be emitted up to the end of the topic, requiring a fix for the offset-syncs topic starvation issue. This also exposed a number of mistakes and flakiness in the existing tests, so this PR also stabilizes the tests to make them useful for validating the negative offsets fix. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated
[GitHub] [kafka] dajac commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
dajac commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1092320492 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -86,14 +100,7 @@ class DefaultApiVersionManager( finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, finalizedFeatures.epoch, controllerApiVersions.orNull, -listenerType) - } - - override def enabledApis: collection.Set[ApiKeys] = { Review Comment: Replaced the method by a val. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
dajac commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1092319133 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682678#comment-17682678 ] Ismael Juma commented on KAFKA-14661: - 3.6.x supported zk 3.4 clients, so this change has compatibility impact - i.e. we probably need a KIP. The second question is: what server versions are supported by 3.8.x clients? > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater
guozhangwang commented on PR #13025: URL: https://github.com/apache/kafka/pull/13025#issuecomment-1410815034 ack, will take a look asap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
C0urante commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092133488 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +479,22 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); Review Comment: We still need an `instanceof` check here: ```suggestion if (object instanceof AutoCloseable) { Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); } ``` ## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ## @@ -137,6 +137,7 @@ import static org.mockito.Mockito.when; public class KafkaProducerTest { +private final int targetInterceptor = 3; Review Comment: Nit: If we're not using this field in any other test, probably makes more sense to inline directly into the test case it's used in. ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -168,6 +169,7 @@ public class KafkaConsumerTest { private final int defaultApiTimeoutMs = 6; private final int requestTimeoutMs = defaultApiTimeoutMs / 2; private final int heartbeatIntervalMs = 1000; +private final int targetInterceptor = 3; Review Comment: Nit: If we're not using this field in any other test, probably makes more sense to inline directly into the test case it's used in. ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +479,22 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); Review Comment: Actually, come to think of it, we might also want to invoke `close` on objects created in `getConfiguredInstance` if they throw an exception from `configure` [here](https://github.com/apache/kafka/blob/6c98544a964b40ede6bbe1b3440f8e5db96a4ad6/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L407). Perhaps we could pull this out into a reusable method and use it both here and there? Thinking something like: ```java private static void maybeClose(Object object, String name) { if (object instanceof AutoCloseable) Utils.closeQuietly(object, name); } ``` ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -45,6 +47,8 @@ public class AbstractConfigTest { + + Review Comment: We don't need this change; it's fine as-is. ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAF
[GitHub] [kafka] lucasbru commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater
lucasbru commented on PR #13025: URL: https://github.com/apache/kafka/pull/13025#issuecomment-1410798829 @guozhangwang also wanted to have a look (note that I didn't address Brunos comments yet) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools
mimaison commented on code in PR #13136: URL: https://github.com/apache/kafka/pull/13136#discussion_r1088957590 ## checkstyle/import-control.xml: ## @@ -407,7 +407,8 @@ - + + Review Comment: Can we keep the trailing spaces like all other entries? ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -1033,7 +1033,7 @@ object ConsumerGroupCommand extends Logging { val describeOpt = parser.accepts("describe", DescribeDoc) val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) -val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc) +val timeoutMsOpt: OptionSpec[Long] = parser.accepts("timeout", TimeoutMsDoc) Review Comment: Why is this PR touching this file? ## tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java: ## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; + +import java.lang.management.ManagementFactory; +import java.net.ServerSocket; +import java.rmi.registry.LocateRegistry; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +public class JmxCommandTest { +private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); + +private static JMXConnectorServer jmxAgent; +private static String jmxUrl; + +@BeforeAll +public static void beforeAll() throws Exception { +int port = findRandomOpenPortOnAllLocalInterfaces(); +jmxAgent = startJmxAgent(port); +jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port); +} + +@AfterAll +public static void afterAll() throws Exception { +jmxAgent.stop(); +} + +@BeforeEach +public void beforeEach() { +Exit.setExitProcedure(exitProcedure); +} + +@AfterEach +public void afterEach() { +Exit.resetExitProcedure(); +} + +@Test +public void kafkaVersion() { +String out = executeAndGetOut("--version"); +assertNormalExit(); +assertEquals(AppInfoParser.getVersion(), out); +} + +@Test +public void unrecognizedOption() { +String err = executeAndGetErr("--foo"); +assertCommandFailure(); +assertTrue(err.contains("UnrecognizedOptionException")); +assertTrue(err.contains("foo")); +} + +@Test +public void missingRequired() { +String err = executeAndGetErr("--reporting-interval"); +assertCommandFailure(); +assertTrue(err.contains("OptionMissingRequiredArgumentException")); +assertTrue(err.contains("reporting-interval")); +} + +@Test +public void invalidJmxUrl() { +String err = executeAndGetErr("--jmx-url", String.format("localhost:")); Review Comment: We don't need `String.format()` here ## tools/src/main/java/org/apache/kafka/tools/JmxTool.java: ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with +
[jira] [Updated] (KAFKA-14664) Raft idle ratio is inaccurate
[ https://issues.apache.org/jira/browse/KAFKA-14664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14664: Description: The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO thread is. When completely idle, it should measure 1. When saturated, it should measure 0. The problem with the current measurements is that they are treated equally with respect to time. For example, say we poll twice with the following durations: Poll 1: 2s Poll 2: 0s Assume that the busy time is negligible, so 2s passes overall. In the first measurement, 2s is spent waiting, so we compute and record a ratio of 1.0. In the second measurement, no time passes, and we record 0.0. The idle ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 0.5), which suggests that the process was busy for 1s, which overestimates the true busy time. Instead, we should sum up the time waiting over the full interval. 2s passes total here and 2s is idle, so we should compute 1.0. was: The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO thread is. When completely idle, it should measure 1. When saturated, it should measure 0. The problem with the current measurements is that they are treated equally with respect to time. For example, say we poll twice with the following durations: Poll 1: 2s Poll 2: 0s Assume that the busy time is negligible, so 2s passes overall. In the first measurement, 2s is spent waiting, so we compute and record a ratio of 1.0. In the second measurement, no time passes, and we record 0.0. The idle ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 0.5), which suggests that the process was busy for 1s. Instead, we should sum up the time waiting over the full interval. 2s passes total here and 2s is idle, so we should compute 1.0. > Raft idle ratio is inaccurate > - > > Key: KAFKA-14664 > URL: https://issues.apache.org/jira/browse/KAFKA-14664 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO > thread is. When completely idle, it should measure 1. When saturated, it > should measure 0. The problem with the current measurements is that they are > treated equally with respect to time. For example, say we poll twice with the > following durations: > Poll 1: 2s > Poll 2: 0s > Assume that the busy time is negligible, so 2s passes overall. > In the first measurement, 2s is spent waiting, so we compute and record a > ratio of 1.0. In the second measurement, no time passes, and we record 0.0. > The idle ratio is then computed as the average of these two values (1.0 + 0.0 > / 2 = 0.5), which suggests that the process was busy for 1s, which > overestimates the true busy time. > Instead, we should sum up the time waiting over the full interval. 2s passes > total here and 2s is idle, so we should compute 1.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #13164: MINOR: Fix scaladoc warnings
guozhangwang commented on PR #13164: URL: https://github.com/apache/kafka/pull/13164#issuecomment-1410744946 LGTM, Merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13164: MINOR: Fix scaladoc warnings
guozhangwang merged PR #13164: URL: https://github.com/apache/kafka/pull/13164 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13120: MINOR: Connect Javadocs improvements
yashmayya commented on code in PR #13120: URL: https://github.com/apache/kafka/pull/13120#discussion_r1092188566 ## connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java: ## @@ -23,25 +23,25 @@ import java.util.List; /** - * An interface for enforcing a policy on overriding of client configs via the connector configs. - * - * Common use cases are ability to provide principal per connector, sasl.jaas.config + * An interface for enforcing a policy on overriding of Kafka client configs via the connector configs. + * + * Common use cases are ability to provide principal per connector, sasl.jaas.config * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges. */ public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable { /** - * Worker will invoke this while constructing the producer for the SourceConnectors, DLQ for SinkConnectors and the consumer for the - * SinkConnectors to validate if all of the overridden client configurations are allowed per the - * policy implementation. This would also be invoked during the validation of connector configs via the Rest API. - * + * Workers will invoke this while constructing producer for SourceConnectors, DLQs for SinkConnectors and + * consumers for SinkConnectors to validate if all of the overridden client configurations are allowed per the Review Comment: Makes sense ## connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java: ## @@ -44,25 +44,25 @@ public ConnectorClientConfigRequest( } /** - * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}. - * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}. - * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ. - * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ. + * Provides Config with prefix "{@code producer.override.}" for {@link ConnectorType#SOURCE}. + * Provides Config with prefix "{@code consumer.override.}" for {@link ConnectorType#SINK}. + * Provides Config with prefix "{@code producer.override.}" for {@link ConnectorType#SINK} for DLQ. + * Provides Config with prefix "{@code admin.override.}" for {@link ConnectorType#SINK} for DLQ. Review Comment: Thanks, that's a great call out! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request, #13177: [KAFKA-14441] Benchmark performance impact of metrics library
jeqo opened a new pull request, #13177: URL: https://github.com/apache/kafka/pull/13177 https://issues.apache.org/jira/browse/KAFKA-14441 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682648#comment-17682648 ] Matthias J. Sax commented on KAFKA-14660: - The original PR did not make sense, as if totalCapacity would really be zero, there is a bug and just setting it to 1 does not sound right. I did already merge a new PR that just raises an exception for this case, and thus avoid divide-by-zero. This should resolve the issue. > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > Fix For: 3.5.0 > > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14584) Move StateChangeLogMerger to tools
[ https://issues.apache.org/jira/browse/KAFKA-14584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682643#comment-17682643 ] Federico Valeri commented on KAFKA-14584: - Users have to migrate from "kafka-run-class.sh kafka.tools.StateChangeLogMerger" to "kafka-run-class.sh org.apache.kafka.tools.StateChangeLogMerger". > Move StateChangeLogMerger to tools > -- > > Key: KAFKA-14584 > URL: https://issues.apache.org/jira/browse/KAFKA-14584 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14584) Move StateChangeLogMerger to tools
[ https://issues.apache.org/jira/browse/KAFKA-14584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-14584: Fix Version/s: 3.5.0 > Move StateChangeLogMerger to tools > -- > > Key: KAFKA-14584 > URL: https://issues.apache.org/jira/browse/KAFKA-14584 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14582) Move JmxTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-14582: Fix Version/s: 3.5.0 > Move JmxTool to tools > - > > Key: KAFKA-14582 > URL: https://issues.apache.org/jira/browse/KAFKA-14582 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14582) Move JmxTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682641#comment-17682641 ] Federico Valeri commented on KAFKA-14582: - Users have to migrate from "kafka-run-class.sh kafka.tools.JmxTool" to "kafka-run-class.sh org.apache.kafka.tools.JmxTool". > Move JmxTool to tools > - > > Key: KAFKA-14582 > URL: https://issues.apache.org/jira/browse/KAFKA-14582 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop
[ https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reopened KAFKA-14553: -- > RecordAccumulator hangs in infinite NOP loop > > > Key: KAFKA-14553 > URL: https://issues.apache.org/jira/browse/KAFKA-14553 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.1 > Environment: - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > Versions of dependencies are defined in boms of SB and SC: > - micrometer-tracing-bridge-brave 1.0.0 > - zipkin-reporter-brave 2.16.3 > - zipkin-sender-kafka 2.16.3 >Reporter: Viczai Gábor >Assignee: Luke Chen >Priority: Minor > Fix For: 3.4.0, 3.3.2 > > > *Summary:* > There is an infinite loop in RecordAccumulator, if stickyBatchSize is > configured to be 0 in BuiltinPartitioner. > (Which is the default case when using KafkaSender's default Builder.) > *Details:* > The infinite loop is caused by this while: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293 > and this continue particularly: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316 > because the partitionChanged() call in the condition always return true if > batchSize is 0. > So program flow never reaches this point: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318 > Thus no span data sent to Kafka ever. > The problematic line in partitionChanged() is when it calls an update on the > BuiltInPartitioner: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242 > which in fact always updates the partition because of this condition: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > therefore the next confdition in RecordAccumulator will evaluate to true also: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243 > thus returning 'true' and forcing the 'continue' in the while(true) loop. > *Suggested fix:* > I think these conditions should be changed: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > The equal signs should be removed from the conditions: > {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > > stickyBatchSize * 2) {{code} > (Btw: line 213 also needs this modification.) > *Note:* > The problem arises because KafkaSender sets the batchSize to 0. > https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88 > *Workaround:* > Simply set the batch size greater than zero. > {code:java}@Configuration > public class SenderConfiguration { > @Bean > KafkaSender kafkaSender() { > Properties overrides = new Properties(); > overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1); > return KafkaSender.newBuilder() > .bootstrapServers("localhost:9092") > .topic("zipkin") > .overrides(overrides) > .build(); > } > }{code} > *Using:* > - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > pom.xml (fragment): > {code:xml} > org.springframework.boot > spring-boot-autoconfigure > > > org.springframework.boot > spring-boot-starter-actuator > > > io.micrometer > micrometer-registry-prometheus > > > io.micrometer > micrometer-tracing-bridge-brave > > > io.zipkin.reporter2 > zipkin-reporter-brave > > > io.zipkin.reporter2 > zipkin-sender-kafka > {code} > Everything is on default settings, except a KafkaSender is explicitely > created as illustrated above. (No autoconfiguration available for Kafka > sender.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop
[ https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14553: - Fix Version/s: (was: 3.4.0) > RecordAccumulator hangs in infinite NOP loop > > > Key: KAFKA-14553 > URL: https://issues.apache.org/jira/browse/KAFKA-14553 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.1 > Environment: - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > Versions of dependencies are defined in boms of SB and SC: > - micrometer-tracing-bridge-brave 1.0.0 > - zipkin-reporter-brave 2.16.3 > - zipkin-sender-kafka 2.16.3 >Reporter: Viczai Gábor >Assignee: Luke Chen >Priority: Minor > Fix For: 3.3.2 > > > *Summary:* > There is an infinite loop in RecordAccumulator, if stickyBatchSize is > configured to be 0 in BuiltinPartitioner. > (Which is the default case when using KafkaSender's default Builder.) > *Details:* > The infinite loop is caused by this while: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293 > and this continue particularly: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316 > because the partitionChanged() call in the condition always return true if > batchSize is 0. > So program flow never reaches this point: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318 > Thus no span data sent to Kafka ever. > The problematic line in partitionChanged() is when it calls an update on the > BuiltInPartitioner: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242 > which in fact always updates the partition because of this condition: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > therefore the next confdition in RecordAccumulator will evaluate to true also: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243 > thus returning 'true' and forcing the 'continue' in the while(true) loop. > *Suggested fix:* > I think these conditions should be changed: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > The equal signs should be removed from the conditions: > {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > > stickyBatchSize * 2) {{code} > (Btw: line 213 also needs this modification.) > *Note:* > The problem arises because KafkaSender sets the batchSize to 0. > https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88 > *Workaround:* > Simply set the batch size greater than zero. > {code:java}@Configuration > public class SenderConfiguration { > @Bean > KafkaSender kafkaSender() { > Properties overrides = new Properties(); > overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1); > return KafkaSender.newBuilder() > .bootstrapServers("localhost:9092") > .topic("zipkin") > .overrides(overrides) > .build(); > } > }{code} > *Using:* > - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > pom.xml (fragment): > {code:xml} > org.springframework.boot > spring-boot-autoconfigure > > > org.springframework.boot > spring-boot-starter-actuator > > > io.micrometer > micrometer-registry-prometheus > > > io.micrometer > micrometer-tracing-bridge-brave > > > io.zipkin.reporter2 > zipkin-reporter-brave > > > io.zipkin.reporter2 > zipkin-sender-kafka > {code} > Everything is on default settings, except a KafkaSender is explicitely > created as illustrated above. (No autoconfiguration available for Kafka > sender.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop
[ https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14553. -- Resolution: Duplicate > RecordAccumulator hangs in infinite NOP loop > > > Key: KAFKA-14553 > URL: https://issues.apache.org/jira/browse/KAFKA-14553 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.1 > Environment: - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > Versions of dependencies are defined in boms of SB and SC: > - micrometer-tracing-bridge-brave 1.0.0 > - zipkin-reporter-brave 2.16.3 > - zipkin-sender-kafka 2.16.3 >Reporter: Viczai Gábor >Assignee: Luke Chen >Priority: Minor > Fix For: 3.4.0, 3.3.2 > > > *Summary:* > There is an infinite loop in RecordAccumulator, if stickyBatchSize is > configured to be 0 in BuiltinPartitioner. > (Which is the default case when using KafkaSender's default Builder.) > *Details:* > The infinite loop is caused by this while: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293 > and this continue particularly: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316 > because the partitionChanged() call in the condition always return true if > batchSize is 0. > So program flow never reaches this point: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318 > Thus no span data sent to Kafka ever. > The problematic line in partitionChanged() is when it calls an update on the > BuiltInPartitioner: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242 > which in fact always updates the partition because of this condition: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > therefore the next confdition in RecordAccumulator will evaluate to true also: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243 > thus returning 'true' and forcing the 'continue' in the while(true) loop. > *Suggested fix:* > I think these conditions should be changed: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > The equal signs should be removed from the conditions: > {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > > stickyBatchSize * 2) {{code} > (Btw: line 213 also needs this modification.) > *Note:* > The problem arises because KafkaSender sets the batchSize to 0. > https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88 > *Workaround:* > Simply set the batch size greater than zero. > {code:java}@Configuration > public class SenderConfiguration { > @Bean > KafkaSender kafkaSender() { > Properties overrides = new Properties(); > overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1); > return KafkaSender.newBuilder() > .bootstrapServers("localhost:9092") > .topic("zipkin") > .overrides(overrides) > .build(); > } > }{code} > *Using:* > - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > pom.xml (fragment): > {code:xml} > org.springframework.boot > spring-boot-autoconfigure > > > org.springframework.boot > spring-boot-starter-actuator > > > io.micrometer > micrometer-registry-prometheus > > > io.micrometer > micrometer-tracing-bridge-brave > > > io.zipkin.reporter2 > zipkin-reporter-brave > > > io.zipkin.reporter2 > zipkin-sender-kafka > {code} > Everything is on default settings, except a KafkaSender is explicitely > created as illustrated above. (No autoconfiguration available for Kafka > sender.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop
[ https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur closed KAFKA-14553. Assignee: Luke Chen > RecordAccumulator hangs in infinite NOP loop > > > Key: KAFKA-14553 > URL: https://issues.apache.org/jira/browse/KAFKA-14553 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.1 > Environment: - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > Versions of dependencies are defined in boms of SB and SC: > - micrometer-tracing-bridge-brave 1.0.0 > - zipkin-reporter-brave 2.16.3 > - zipkin-sender-kafka 2.16.3 >Reporter: Viczai Gábor >Assignee: Luke Chen >Priority: Minor > Fix For: 3.4.0, 3.3.2 > > > *Summary:* > There is an infinite loop in RecordAccumulator, if stickyBatchSize is > configured to be 0 in BuiltinPartitioner. > (Which is the default case when using KafkaSender's default Builder.) > *Details:* > The infinite loop is caused by this while: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293 > and this continue particularly: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316 > because the partitionChanged() call in the condition always return true if > batchSize is 0. > So program flow never reaches this point: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318 > Thus no span data sent to Kafka ever. > The problematic line in partitionChanged() is when it calls an update on the > BuiltInPartitioner: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242 > which in fact always updates the partition because of this condition: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > therefore the next confdition in RecordAccumulator will evaluate to true also: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243 > thus returning 'true' and forcing the 'continue' in the while(true) loop. > *Suggested fix:* > I think these conditions should be changed: > https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218 > The equal signs should be removed from the conditions: > {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > > stickyBatchSize * 2) {{code} > (Btw: line 213 also needs this modification.) > *Note:* > The problem arises because KafkaSender sets the batchSize to 0. > https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88 > *Workaround:* > Simply set the batch size greater than zero. > {code:java}@Configuration > public class SenderConfiguration { > @Bean > KafkaSender kafkaSender() { > Properties overrides = new Properties(); > overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1); > return KafkaSender.newBuilder() > .bootstrapServers("localhost:9092") > .topic("zipkin") > .overrides(overrides) > .build(); > } > }{code} > *Using:* > - Spring Boot 3.0.1 > - Spring Cloud 2022.0.0 > pom.xml (fragment): > {code:xml} > org.springframework.boot > spring-boot-autoconfigure > > > org.springframework.boot > spring-boot-starter-actuator > > > io.micrometer > micrometer-registry-prometheus > > > io.micrometer > micrometer-tracing-bridge-brave > > > io.zipkin.reporter2 > zipkin-reporter-brave > > > io.zipkin.reporter2 > zipkin-sender-kafka > {code} > Everything is on default settings, except a KafkaSender is explicitely > created as illustrated above. (No autoconfiguration available for Kafka > sender.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682621#comment-17682621 ] Divij Vaidya edited comment on KAFKA-14661 at 1/31/23 3:43 PM: --- >From Zk 3.8.1 release notes [1] - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers. Apache Kafka (AK) upgraded to Zk v3.5.x starting AK v2.4.0 release [2] which means that clients starting Apache Kafka 2.4.0 will be able to communicate with servers running Zk 3.8.x. [1] [https://zookeeper.apache.org/releases.html] [2] https://issues.apache.org/jira/browse/KAFKA-8634 was (Author: divijvaidya): >From Zk 3.8.1 release notes [1] - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers. Apache Kafka upgraded to Zk v3.5.x starting 2.4.0 release [2] which means that clients starting Apache Kafka 2.4.0 will be able to communicate with servers running 3.8.x Zk. [1] [https://zookeeper.apache.org/releases.html] [2] https://issues.apache.org/jira/browse/KAFKA-8634 > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682621#comment-17682621 ] Divij Vaidya edited comment on KAFKA-14661 at 1/31/23 3:43 PM: --- >From Zk 3.8.1 release notes [1] - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers. Apache Kafka upgraded to Zk v3.5.x starting 2.4.0 release [2] which means that clients starting Apache Kafka 2.4.0 will be able to communicate with servers running 3.8.x Zk. [1] [https://zookeeper.apache.org/releases.html] [2] https://issues.apache.org/jira/browse/KAFKA-8634 was (Author: divijvaidya): >From Zk 3.8.1 release notes [1] - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers. Apache Kafka upgraded to 3.5.x starting 2.4.0 release [2] which means that clients starting Apache Kafka 2.4.0 will be able to communicate with servers running 3.8.x Zk. [1] [https://zookeeper.apache.org/releases.html] [2] https://issues.apache.org/jira/browse/KAFKA-8634 > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682621#comment-17682621 ] Divij Vaidya commented on KAFKA-14661: -- >From Zk 3.8.1 release notes [1] - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers. Apache Kafka upgraded to 3.5.x starting 2.4.0 release [2] which means that clients starting Apache Kafka 2.4.0 will be able to communicate with servers running 3.8.x Zk. [1] [https://zookeeper.apache.org/releases.html] [2] https://issues.apache.org/jira/browse/KAFKA-8634 > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)
[ https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682618#comment-17682618 ] Jochen Schalanda commented on KAFKA-14646: -- [~mjsax] Unfortunately we already "solved" the issue by recreating the respective topologies with new internal topic names, so we cannot try out your suggestion. > SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> > 3.3.2) > > > Key: KAFKA-14646 > URL: https://issues.apache.org/jira/browse/KAFKA-14646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0, 3.3.1, 3.3.2 > Environment: Kafka Streams 3.2.3 (before update) > Kafka Streams 3.3.2 (after update) > Java 17 (Eclipse Temurin 17.0.5), Linux x86_64 >Reporter: Jochen Schalanda >Priority: Major > Fix For: 3.4.0, 3.3.3 > > > Hey folks, > > we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and > started getting the following exceptions: > {code:java} > org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. {code} > After swiftly looking through the code, this exception is potentially thrown > in two places: > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78] > ** Here the check was changed in Kafka 3.3.x: > [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12] > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] > ** Here the check wasn't changed. > > Is it possible that the second check in > {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten? > > Any hints how to resolve this issue without a downgrade? > Since this only affects 2 of 15 topologies in the application, I'm hesitant > to just downgrade to Kafka 3.2.3 again since the internal topics might > already have been updated to use the "new" version of > {{{}SubscriptionWrapper{}}}. > > Related discussion in the Confluent Community Slack: > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119] > h2. Stack trace > {code:java} > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_8, > processor=XXX-joined-changed-fk-subscription-registration-source, > topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, > partition=8, offset=12297976, > stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
divijvaidya commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1410523645 @anatasiavela the proposal (using concurrentHashMap for producers) impacts this change. Let me try to explain why. Currently you are using `producerIdCount` and updating it on every add/delete to the `producers` map. Alternative path was to simply call `producers.size()` whenever we want to compute the metric. But you didn't choose this path because `producers` is accessed by multiple threads at the same time and we usually guard access to it via a lock. Hence, if you wanted to call `producers.size()` you would have to acquire a lock and that is not optimal for a mere metric calculation. Hence, your approach made sense. BUT if you change the `producers` to a concurrentHashMap, you wouldn't have to acquire a lock to call `producers.size()` when computing for metrics. Without acquiring a lock, the risk is that another thread may be mutating the `producers` map at the same time. As per the discussion above in the PR, that is acceptable to us. Hence, you can simply change the `producers` to a concurrentHashMap and remove the code logic to update `producerIdCount` on every add/remove. This simplifi es the code greatly. Let me know if that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14656) Brokers rejecting LISR during ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-14656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14656. -- Resolution: Fixed > Brokers rejecting LISR during ZK migration > -- > > Key: KAFKA-14656 > URL: https://issues.apache.org/jira/browse/KAFKA-14656 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.4.0 > > > During the ZK migration, the KRaft controller sends controller RPCs to the ZK > brokers (LISR, UMR, etc). Since the migration can begin immediately after a > ZK broker starts up with migration enabled, it is possible that this broker > is not seen as alive by the rest of the brokers. This is due to the KRaft > controller taking over before the ZK controller can send out UMR with the > restarted broker. > > The result is that the parts of the LISR sent by KRaft immediately after the > metadata migration is rejected by brokers due the leader being offline. > > The fix for this is to send an UMR to all brokers after the migration with > the set of alive brokers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128
Cerchie commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1092035905 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -538,6 +544,8 @@ protected Map getNumPartitions(final Set topics, tempUnknownTopics.add(topicName); log.debug("The leader of topic {} is not available.\n" + "Error message was: {}", topicName, cause.toString()); +} else if (cause instanceof TimeoutException) { +throw new RuntimeException(); Review Comment: Thank you -- removed the exception and the existing block and replaced the exception with the correct behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128
Cerchie commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1092034974 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -466,7 +469,10 @@ public Set makeReady(final Map topics) { topicName) ); } -} else { +} else if (cause instanceof TimeoutException) { Review Comment: ah -- that's right. done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128
Cerchie commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1092027837 ## clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java: ## @@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) { * Waits if necessary for this future to complete, and then returns its result. */ @Override -public T get() throws InterruptedException, ExecutionException { +public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Review Comment: I'm not sure what you mean here -- for more on the problem, read https://issues.apache.org/jira/browse/KAFKA-14128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128
Cerchie commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1092026440 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -521,7 +524,7 @@ protected Map getNumPartitions(final Set topics, for (final Map.Entry> topicFuture : futures.entrySet()) { final String topicName = topicFuture.getKey(); try { -final TopicDescription topicDescription = topicFuture.getValue().get(); +final TopicDescription topicDescription = topicFuture.getValue().get(Long.parseLong(DEFAULT_API_TIMEOUT_MS_CONFIG), TimeUnit.MILLISECONDS); Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092017609 ## clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java: ## @@ -55,6 +58,11 @@ public void configure(Map configs) { Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG); if (clientIdValue == null) throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG); + +CONFIG_COUNT.incrementAndGet(); +if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) { +throw new ConfigException("Kafka producer creation failed. Failure may not have cleaned up listener thread resource."); Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
dpcollins-google commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1091834852 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: Sure. In a particular workload, this code path was about 30% of CPU usage in flamegraphs. It is now 2-3% after a local patch. This hasn't been discussed in dev- its just an attempt to upstream a small performance improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #13164: MINOR: Fix scaladoc warnings
lucasbru commented on PR #13164: URL: https://github.com/apache/kafka/pull/13164#issuecomment-1410168606 @mjsax can we merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
fvaleri commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1410076642 > @mimaison , I updated the system test to point to the new class. That one place seemed to be the only one relevant in this case. Do you have a test run output that shows it works and run time is similar? You can look at what I did for the JmxTool migration that is also used by STs. I would also suggest to discard the first run of such test, because the test framework needs to start a bunch of containers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-141003 @mimaison , I updated the system test to point to the new class. That one place seemed to be the only one relevant in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13176: MINOR: some ZK migration code cleanups.
Hangleton commented on code in PR #13176: URL: https://github.com/apache/kafka/pull/13176#discussion_r1091621015 ## core/src/main/scala/kafka/migration/MigrationPropagator.scala: ## @@ -79,6 +81,18 @@ class MigrationPropagator( _image = image } + /** + * A very expensive function that creates a map with an entry for every partition that exists, from + * (topic name, partition index) to partition registration. + */ + def materializePartitions(topicsImage: TopicsImage): util.Map[TopicPartition, PartitionRegistration] = { Review Comment: Curious - what is the reason for using `util.Map` instead of one of scala map DS? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13163: KAFKA-14653: MirrorMakerConfig using raw properties instead of post-r…
urbandan commented on code in PR #13163: URL: https://github.com/apache/kafka/pull/13163#discussion_r1091581987 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -246,13 +246,22 @@ public Map originals(Map configOverrides) { */ public Map originalsStrings() { Map copy = new RecordingMap<>(); +copyAsStrings(originals, copy); +return copy; +} + +/** + * Ensures that all values of a map are strings, and copies them to another map. + * @param originals The map to validate. + * @param copy The target to copy to. + */ +protected static void copyAsStrings(Map originals, Map copy) { Review Comment: Originally, I had this inlined in the MMConfig ctor, but didn't want to duplicate code - anyway, I'll inline it again, that's safer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
urbandan commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1091577828 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.mirror.MirrorMaker; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; + +@Tag("integration") +public class DedicatedMirrorIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class); + +private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000; +private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000; + +private Map kafkaClusters; +private Map mirrorMakers; + +@BeforeEach +public void setup() { +kafkaClusters = new HashMap<>(); +mirrorMakers = new HashMap<>(); +} + +@AfterEach +public void teardown() throws Throwable { +AtomicReference shutdownFailure = new AtomicReference<>(); +mirrorMakers.forEach((name, mirrorMaker) -> +Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure) +); +kafkaClusters.forEach((name, kafkaCluster) -> +Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure) +); +if (shutdownFailure.get() != null) { +throw shutdownFailure.get(); +} +} + +private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) { +if (kafkaClusters.containsKey(name)) +throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name"); + +EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties); +kafkaClusters.put(name, result); + +result.start(); + +return result; +} + +private MirrorMaker startMirrorMaker(String name, Map mmProps) { +if (mirrorMakers.containsKey(name)) +throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name"); + +MirrorMaker result = new MirrorMaker(mmProps); +mirrorMakers.put(name, result); + +result.start(); + +return result; +} + +/** + * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime Review Comment: Thanks for the clarification. I think using the EOS endpoints should be enough, as I don't really have any ideas on how to make sure that the task config update is triggered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management
mjsax merged PR #13142: URL: https://github.com/apache/kafka/pull/13142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13143: KAFKA-14491: [3/N] Add logical key value segments
mjsax commented on PR #13143: URL: https://github.com/apache/kafka/pull/13143#issuecomment-1409929859 Merged the other PR -- can you rebase this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org