[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on PR #12414: URL: https://github.com/apache/kafka/pull/12414#issuecomment-1216159486 Hi @dengziming @jsancio, can you help with a review on this PR. If it looks okay to merge I can clean up the PR and remove temporary comments added by me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12519: [WIP] KAFKA-10199: Handle exceptions from state updater
guozhangwang commented on PR #12519: URL: https://github.com/apache/kafka/pull/12519#issuecomment-1216080732 cc @cadonna would like to hear your comments before I continue adding unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on PR #12466: URL: https://github.com/apache/kafka/pull/12466#issuecomment-1216080804 Merged to trunk, @cadonna please feel free to leave more comments, and I will address in https://github.com/apache/kafka/pull/12519 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request, #12519: [WIP] KAFKA-10199: Handle exceptions from state updater
guozhangwang opened a new pull request, #12519: URL: https://github.com/apache/kafka/pull/12519 1. In state updater, when handling task corrupted exception due to invalid restoring offset, first delete the affected partitions from the checkpoint before reporting it back to the stream thread. This is to mimic the same behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. It's cleaner to do so inside the restore thread, plus it enables us to optimize by only deleting those corrupted partitions, and not all. 2. In the state manager, handle the drained exceptions as follows (this is the same as handling all exceptions from `handleAssignment`): 1) Task-migrated, throw all the way to stream-thread as `handleTaskMigrated`, 2) any fatal Streams exception, throw all the way to stream-thread to trigger exception handler, 3) Task-corrupted, throw to the stream-thread as `handleCorruption`. Note that for 3), we would specially distinguish if the corrupted-tasks are already closed (when they are thrown from `handleAssignment` or not (when they are thrown from the state updater). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang merged PR #12466: URL: https://github.com/apache/kafka/pull/12466 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12518: KAFKA-14167; Completion exceptions should not be translated directly to error codes
hachikuji opened a new pull request, #12518: URL: https://github.com/apache/kafka/pull/12518 There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `UpdateFeatures` APIs were affected by this bug, but it is difficult to root out all cases. Interestingly, `DeleteTopics` was not affected as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch moves this logic out of `ApiError.fromThrowable` and into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14167) Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller
[ https://issues.apache.org/jira/browse/KAFKA-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14167: Priority: Blocker (was: Major) > Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller > > > Key: KAFKA-14167 > URL: https://issues.apache.org/jira/browse/KAFKA-14167 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > In `ControllerApis`, we have callbacks such as the following after completion: > {code:java} > controller.allocateProducerIds(context, allocatedProducerIdsRequest.data) > .handle[Unit] { (results, exception) => > if (exception != null) { > requestHelper.handleError(request, exception) > } else { > requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs > => { > results.setThrottleTimeMs(requestThrottleMs) > new AllocateProducerIdsResponse(results) > }) > } > } {code} > What I see locally is that the underlying exception that gets passed to > `handle` always gets wrapped in a `CompletionException`. When passed to > `getErrorResponse`, this error will get converted to `UNKNOWN_SERVER_ERROR`. > For example, in this case, a `NOT_CONTROLLER` error returned from the > controller would be returned as `UNKNOWN_SERVER_ERROR`. It looks like there > are a few APIs that are potentially affected by this bug, such as > `DeleteTopics` and `UpdateFeatures`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14167) Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller
Jason Gustafson created KAFKA-14167: --- Summary: Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller Key: KAFKA-14167 URL: https://issues.apache.org/jira/browse/KAFKA-14167 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson In `ControllerApis`, we have callbacks such as the following after completion: {code:java} controller.allocateProducerIds(context, allocatedProducerIdsRequest.data) .handle[Unit] { (results, exception) => if (exception != null) { requestHelper.handleError(request, exception) } else { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { results.setThrottleTimeMs(requestThrottleMs) new AllocateProducerIdsResponse(results) }) } } {code} What I see locally is that the underlying exception that gets passed to `handle` always gets wrapped in a `CompletionException`. When passed to `getErrorResponse`, this error will get converted to `UNKNOWN_SERVER_ERROR`. For example, in this case, a `NOT_CONTROLLER` error returned from the controller would be returned as `UNKNOWN_SERVER_ERROR`. It looks like there are a few APIs that are potentially affected by this bug, such as `DeleteTopics` and `UpdateFeatures`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14167) Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller
[ https://issues.apache.org/jira/browse/KAFKA-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14167: Fix Version/s: 3.3.0 > Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller > > > Key: KAFKA-14167 > URL: https://issues.apache.org/jira/browse/KAFKA-14167 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > Fix For: 3.3.0 > > > In `ControllerApis`, we have callbacks such as the following after completion: > {code:java} > controller.allocateProducerIds(context, allocatedProducerIdsRequest.data) > .handle[Unit] { (results, exception) => > if (exception != null) { > requestHelper.handleError(request, exception) > } else { > requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs > => { > results.setThrottleTimeMs(requestThrottleMs) > new AllocateProducerIdsResponse(results) > }) > } > } {code} > What I see locally is that the underlying exception that gets passed to > `handle` always gets wrapped in a `CompletionException`. When passed to > `getErrorResponse`, this error will get converted to `UNKNOWN_SERVER_ERROR`. > For example, in this case, a `NOT_CONTROLLER` error returned from the > controller would be returned as `UNKNOWN_SERVER_ERROR`. It looks like there > are a few APIs that are potentially affected by this bug, such as > `DeleteTopics` and `UpdateFeatures`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14039) Fix KRaft AlterConfigPolicy usage
[ https://issues.apache.org/jira/browse/KAFKA-14039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14039. -- Resolution: Fixed > Fix KRaft AlterConfigPolicy usage > - > > Key: KAFKA-14039 > URL: https://issues.apache.org/jira/browse/KAFKA-14039 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.3.0, 3.4.0 > > > In ConfigurationControlManager, we are currently passing all the > configuration values known to the controller down into the AlterConfigPolicy. > This does not match the behavior in ZK mode where we only pass configs which > were included in the alter configs request. > This can lead to different unexpected behavior in custom AlterConfigPolicy > implementations -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946203610 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldHandleRemovedTasksFromStateUpdater() { Review Comment: When adding tests I realized we do not need to handle exceptions by closing tasks dirty and then re-thrown, but could simply re-throw directly and delegate to the StreamThread to handle the thrown exception. Will updated accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #12515: MINOR: Improve readability of `tryProcessAlterPartition`
ijuma merged PR #12515: URL: https://github.com/apache/kafka/pull/12515 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
[ https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-13809. --- Fix Version/s: 3.4.0 Resolution: Fixed > FileStreamSinkConnector and FileStreamSourceConnector should propagate full > configuration to tasks > -- > > Key: KAFKA-13809 > URL: https://issues.apache.org/jira/browse/KAFKA-13809 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Daniel Urban >Assignee: Yash Mayya >Priority: Major > Fix For: 3.4.0 > > > The 2 example connectors do not propagate the full connector configuration to > the tasks. This makes it impossible to override built-in configs, such as > producer/consumer overrides. > This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
[ https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-13809: - Assignee: Yash Mayya > FileStreamSinkConnector and FileStreamSourceConnector should propagate full > configuration to tasks > -- > > Key: KAFKA-13809 > URL: https://issues.apache.org/jira/browse/KAFKA-13809 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Daniel Urban >Assignee: Yash Mayya >Priority: Major > > The 2 example connectors do not propagate the full connector configuration to > the tasks. This makes it impossible to override built-in configs, such as > producer/consumer overrides. > This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
[ https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reopened KAFKA-13809: --- > FileStreamSinkConnector and FileStreamSourceConnector should propagate full > configuration to tasks > -- > > Key: KAFKA-13809 > URL: https://issues.apache.org/jira/browse/KAFKA-13809 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Daniel Urban >Assignee: Yash Mayya >Priority: Major > > The 2 example connectors do not propagate the full connector configuration to > the tasks. This makes it impossible to override built-in configs, such as > producer/consumer overrides. > This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji opened a new pull request, #12517: KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader
hachikuji opened a new pull request, #12517: URL: https://github.com/apache/kafka/pull/12517 Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` request is sent to a node that is not the current leader. In addition to being inconsistent with all of the other leader APIs, this error is treated as fatal by both the forwarding manager and the admin client. Instead, we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable and we rely on the admin client to retry it after seeing this error. Note that we could also check for this error in `ForwardingManager` and retry the request from that context, but the current forwarding logic is only setup to handle retries for requests sent to the controller, which is logically distinct from the raft leader. The `DescribeQuorum` is a bit of an oddball from an API perspective because it is a multi-partition request, but it is always forwarded to the controller since there is only one raft partition. So to handle this case specially, we would have to unwrap the envelope and check the error for the __cluster_metadata partition. It seemed simpler to rely on the admin client to retry. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946168656 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldHandleRemovedTasksFromStateUpdater() { Review Comment: This test did verify that `initializeIfNeeded` is called, and the tasks are added back to the state updater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946159951 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldHandleRemovedTasksFromStateUpdater() { +// tasks to recycle +final StreamTask task00 = mock(StreamTask.class); +final StandbyTask task01 = mock(StandbyTask.class); +final StandbyTask task00Converted = mock(StandbyTask.class); +final StreamTask task01Converted = mock(StreamTask.class); +// task to close +final StreamTask task02 = mock(StreamTask.class); +// task to update inputs +final StreamTask task03 = mock(StreamTask.class); +when(task00.id()).thenReturn(taskId00); +when(task01.id()).thenReturn(taskId01); +when(task02.id()).thenReturn(taskId02); +when(task03.id()).thenReturn(taskId03); +when(task00.inputPartitions()).thenReturn(taskId00Partitions); +when(task01.inputPartitions()).thenReturn(taskId01Partitions); +when(task02.inputPartitions()).thenReturn(taskId02Partitions); +when(task03.inputPartitions()).thenReturn(taskId03Partitions); +when(task00.isActive()).thenReturn(true); +when(task01.isActive()).thenReturn(false); +when(task02.isActive()).thenReturn(true); +when(task03.isActive()).thenReturn(true); +when(task00.state()).thenReturn(State.RESTORING); +when(task01.state()).thenReturn(State.RUNNING); +when(task02.state()).thenReturn(State.RESTORING); +when(task03.state()).thenReturn(State.RESTORING); Review Comment: Ack. Rebased on your utils and break it down to multiple tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946157896 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -43,35 +43,48 @@ class Tasks { private final Logger log; +// TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map activeTasksPerId = new TreeMap<>(); private final Map standbyTasksPerId = new TreeMap<>(); // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or // we receive a new assignment and they are revoked from the thread. - -// Tasks may have been assigned but not yet created because: -// 1. They are for a NamedTopology that is yet known by this host. -// 2. They are to be recycled from an existing restoring task yet to be returned from the state updater. -// -// When that occurs we stash these pending tasks until either they are finally clear to be created, -// or they are revoked from a new assignment. private final Map> pendingActiveTasksToCreate = new HashMap<>(); private final Map> pendingStandbyTasksToCreate = new HashMap<>(); private final Map> pendingTasksToRecycle = new HashMap<>(); private final Map> pendingTasksToUpdateInputPartitions = new HashMap<>(); private final Set pendingTasksToInit = new HashSet<>(); private final Set pendingTasksToClose = new HashSet<>(); +// TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map activeTasksPerPartition = new HashMap<>(); Tasks(final LogContext logContext) { this.log = logContext.logger(getClass()); } -void purgePendingTasksToCreate(final Set assignedActiveTasks, final Set assignedStandbyTasks) { -pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks); -pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks); +void purgePendingTasksToCreate() { +pendingActiveTasksToCreate.clear(); +pendingStandbyTasksToCreate.clear(); +} + +Map> drainPendingActiveTasksForTopologies(final Set currentTopologies) { Review Comment: Ack. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -43,35 +43,48 @@ class Tasks { private final Logger log; +// TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map activeTasksPerId = new TreeMap<>(); private final Map standbyTasksPerId = new TreeMap<>(); // Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash // these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or // we receive a new assignment and they are revoked from the thread. - -// Tasks may have been assigned but not yet created because: -// 1. They are for a NamedTopology that is yet known by this host. -// 2. They are to be recycled from an existing restoring task yet to be returned from the state updater. -// -// When that occurs we stash these pending tasks until either they are finally clear to be created, -// or they are revoked from a new assignment. private final Map> pendingActiveTasksToCreate = new HashMap<>(); private final Map> pendingStandbyTasksToCreate = new HashMap<>(); private final Map> pendingTasksToRecycle = new HashMap<>(); private final Map> pendingTasksToUpdateInputPartitions = new HashMap<>(); private final Set pendingTasksToInit = new HashSet<>(); private final Set pendingTasksToClose = new HashSet<>(); +// TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map activeTasksPerPartition = new HashMap<>(); Tasks(final LogContext logContext) { this.log = logContext.logger(getClass()); } -void purgePendingTasksToCreate(final Set assignedActiveTasks, final Set assignedStandbyTasks) { -pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks); -pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks); +void purgePendingTasksToCreate() { +pendingActiveTasksToCreate.clear(); +pendingStandbyTasksToCreate.clear(); +} + +Map> drainPendingActiveTasksForTopologies(final Set currentTopologies) { +final Map> pendingActiveTasksForTopologies = +filterMap(pendingActiveTasksToCreate, t -> currentTopologies.contains(t.getKey().topologyName())); + + pendingActiveTasksToCreate.keySet().removeAll(pendingActiveTasksForTopologies.keySet()); + +return
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946148995 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() { Mockito.verify(stateUpdater).add(task01); } +@Test +public void shouldHandleRemovedTasksFromStateUpdater() { +// tasks to recycle +final StreamTask task00 = mock(StreamTask.class); +final StandbyTask task01 = mock(StandbyTask.class); +final StandbyTask task00Converted = mock(StandbyTask.class); +final StreamTask task01Converted = mock(StreamTask.class); +// task to close +final StreamTask task02 = mock(StreamTask.class); +// task to update inputs +final StreamTask task03 = mock(StreamTask.class); +when(task00.id()).thenReturn(taskId00); +when(task01.id()).thenReturn(taskId01); +when(task02.id()).thenReturn(taskId02); +when(task03.id()).thenReturn(taskId03); +when(task00.inputPartitions()).thenReturn(taskId00Partitions); +when(task01.inputPartitions()).thenReturn(taskId01Partitions); +when(task02.inputPartitions()).thenReturn(taskId02Partitions); +when(task03.inputPartitions()).thenReturn(taskId03Partitions); +when(task00.isActive()).thenReturn(true); +when(task01.isActive()).thenReturn(false); +when(task02.isActive()).thenReturn(true); +when(task03.isActive()).thenReturn(true); +when(task00.state()).thenReturn(State.RESTORING); +when(task01.state()).thenReturn(State.RUNNING); +when(task02.state()).thenReturn(State.RESTORING); +when(task03.state()).thenReturn(State.RESTORING); +when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01, task02, task03)); + +expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) +.andStubReturn(task01Converted); +activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); +expectLastCall().times(2); +expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions))) +.andStubReturn(task00Converted); +expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); +consumer.resume(anyObject()); +expectLastCall().anyTimes(); +replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer); Review Comment: Yes.. I tried to use Mockito for all and soon realized it would incur a much larger scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946147457 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -316,20 +313,22 @@ public void handleAssignment(final Map> activeTasks, classifyTasksWithStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); } -tasks.addPendingActiveTasks(pendingTasksToCreate(activeTasksToCreate)); - tasks.addPendingStandbyTasks(pendingTasksToCreate(standbyTasksToCreate)); +tasks.purgePendingTasksToCreate(); + tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(activeTasksToCreate)); + tasks.addPendingStandbyTasksToCreate(pendingTasksToCreate(standbyTasksToCreate)); Review Comment: ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater
guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r946143073 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -656,6 +657,71 @@ boolean tryToCompleteRestoration(final long now, final java.util.function.Consum return allRunning; } +private void addTaskstoStateUpdater() { +for (final Task task : tasks.drainPendingTaskToInit()) { +task.initializeIfNeeded(); +stateUpdater.add(task); +} +} + +private void handleRemovedTasksFromStateUpdater() { +final Map taskExceptions = new LinkedHashMap<>(); +final Set tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id)); + +for (final Task task : stateUpdater.drainRemovedTasks()) { +final TaskId taskId = task.id(); +Set inputPartitions; +if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { +try { +final Task newTask = task.isActive() ? +convertActiveToStandby((StreamTask) task, inputPartitions) : +convertStandbyToActive((StandbyTask) task, inputPartitions); +newTask.initializeIfNeeded(); +stateUpdater.add(newTask); +} catch (final RuntimeException e) { +final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " + +"Attempting to close remaining tasks before re-throwing:", taskId); +log.error(uncleanMessage, e); + +if (task.state() != State.CLOSED) { +tasksToCloseDirty.add(task); +} + +taskExceptions.putIfAbsent(taskId, e); +} +} else if (tasks.removePendingTaskToClose(task.id())) { +try { +task.closeClean(); Review Comment: You're right! Added and also updated unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12699) Streams no longer overrides the java default uncaught exception handler
[ https://issues.apache.org/jira/browse/KAFKA-12699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12699: --- Fix Version/s: 3.4.0 (was: 3.3.0) > Streams no longer overrides the java default uncaught exception handler > - > > Key: KAFKA-12699 > URL: https://issues.apache.org/jira/browse/KAFKA-12699 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Minor > Fix For: 3.4.0 > > > If a user used `Thread.setUncaughtExceptionHanlder()` to set the handler for > all threads in the runtime streams would override that with its own handler. > However since streams does not use the `Thread` handler anymore it will no > longer do so. This can cause problems if the user does something like > `System.exit(1)` in the handler. > > If using the old handler in streams it will still work as it used to -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12508: KAFKA-13888: Addition of Information in DescribeQuorumResponse
hachikuji commented on code in PR #12508: URL: https://github.com/apache/kafka/pull/12508#discussion_r946081969 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -323,19 +372,30 @@ private static class ReplicaState implements Comparable { final int nodeId; Optional endOffset; OptionalLong lastFetchTimestamp; +OptionalLong lastfetchLeaderLogEndOffset; Review Comment: nit: should be `lastFetchLeaderLogEndOffset` ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -778,4 +779,52 @@ class KRaftClusterTest { cluster.close() } } + def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): Admin = { +var props: Properties = null +props = cluster.clientProperties() +props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) +Admin.create(props) + } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(4). +setNumControllerNodes(3).build()).build() +try { + cluster.format + cluster.startup + for (i <- 0 to 3) { +TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val admin = createAdminClient(cluster, false) + try { +val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) +val quorumInfo = quorumState.quorumInfo.get() + +assertEquals(0, quorumInfo.observers.size) Review Comment: This probably fails now that brokers send their replicaId. ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -778,4 +779,52 @@ class KRaftClusterTest { cluster.close() } } + def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): Admin = { +var props: Properties = null +props = cluster.clientProperties() +props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) +Admin.create(props) + } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(4). +setNumControllerNodes(3).build()).build() +try { + cluster.format + cluster.startup + for (i <- 0 to 3) { +TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val admin = createAdminClient(cluster, false) + try { +val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) +val quorumInfo = quorumState.quorumInfo.get() + +assertEquals(0, quorumInfo.observers.size) +assertEquals(3, quorumInfo.voters.size) Review Comment: A more straightforward way to assert the voter/observer sets is in `DescribeQuorumRequestTest`: ```scala val voterData = partitionData.currentVoters.asScala assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet); val observerData = partitionData.observers.asScala assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet); ``` ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1179,8 +1179,12 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( leaderState.localId(), leaderState.epoch(), leaderState.highWatermark().isPresent() ? leaderState.highWatermark().get().offset : -1, -convertToReplicaStates(leaderState.getVoterEndOffsets()), - convertToReplicaStates(leaderState.getObserverStates(currentTimeMs)) +convertToReplicaStates(leaderState.getVoterEndOffsets(), +leaderState.getVoterLastFetchTimes(), Review Comment: I think it would be a bit simpler to push creation of the `ReplicaState` object into `LeaderState`: ```java class LeaderState { ... List voterStates(long currentTimeMs); List observerStates(long currentTimeMs); } ``` Then we wouldn't need all the boilerplate methods to create all the different Map variations. Also, then the sanity checks in `convertToReplicaStates` become unnecessary. ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -217,19 +218,30 @@ public boolean updateLocalState(long fetchTimestamp, LogOffsetMetadata logOffset * @param replicaId replica id * @param fetchTimestamp fetch timestamp * @param logOffsetMetadata new log offset and metadata + * @param leaderLogEndOffset current log end offset of the leader * @return true if the high watermark is updated too */ -public boolean
[jira] [Assigned] (KAFKA-13940) DescribeQuorum returns INVALID_REQUEST if not handled by leader
[ https://issues.apache.org/jira/browse/KAFKA-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13940: --- Assignee: Jason Gustafson > DescribeQuorum returns INVALID_REQUEST if not handled by leader > --- > > Key: KAFKA-13940 > URL: https://issues.apache.org/jira/browse/KAFKA-13940 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > In `KafkaRaftClient.handleDescribeQuorum`, we currently return > INVALID_REQUEST if the node is not the current raft leader. This is > surprising and doesn't work with our general approach for retrying forwarded > APIs. In `BrokerToControllerChannelManager`, we only retry after > `NOT_CONTROLLER` errors. It would be more consistent with the other Raft APIs > if we returned NOT_LEADER_OR_FOLLOWER, but that also means we need additional > logic in `BrokerToControllerChannelManager` to handle that error and retry > correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh
hachikuji commented on code in PR #12469: URL: https://github.com/apache/kafka/pull/12469#discussion_r946063842 ## core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala: ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.tools.TerseFailure +import kafka.utils.Exit +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue} +import net.sourceforge.argparse4j.inf.Subparsers +import org.apache.kafka.clients._ +import org.apache.kafka.clients.admin.{Admin, QuorumInfo} +import org.apache.kafka.common.utils.Utils + +import java.io.File +import java.util.Properties +import scala.jdk.CollectionConverters._ + +/** + * A tool for describing quorum status + */ +object MetadataQuorumCommand { + + def main(args: Array[String]): Unit = { +val res = mainNoExit(args) +Exit.exit(res) + } + + def mainNoExit(args: Array[String]): Int = { +val parser = ArgumentParsers + .newArgumentParser("kafka-metadata-quorum") + .defaultHelp(true) + .description("This tool describes kraft metadata quorum status.") +parser + .addArgument("--bootstrap-server") + .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") + .required(true) + +parser + .addArgument("--command-config") + .`type`(fileType()) + .help("Property file containing configs to be passed to Admin Client.") +val subparsers = parser.addSubparsers().dest("command") +addDescribeParser(subparsers) + +var admin: Admin = null +try { + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + + val commandConfig = namespace.get[File]("command_config") + val props = if (commandConfig != null) { +if (!commandConfig.exists()) { + throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!") +} +Utils.loadProps(commandConfig.getPath) + } else { +new Properties() + } + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")) + admin = Admin.create(props) + + if (command == "describe") { +if (namespace.getBoolean("status") && namespace.getBoolean("replication")) { + throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command") +} else if (namespace.getBoolean("replication")) { + handleDescribeReplication(admin) +} else if (namespace.getBoolean("status")) { + handleDescribeStatus(admin) +} else { + throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command") +} + } else { +// currently we only support describe + } + 0 +} catch { + case e: TerseFailure => +Console.err.println(e.getMessage) +1 +} finally { + if (admin != null) { +admin.close() + } +} + } + + def addDescribeParser(subparsers: Subparsers): Unit = { +val describeParser = subparsers + .addParser("describe") + .help("Describe the metadata quorum info") + +val statusArgs = describeParser.addArgumentGroup("Status") +statusArgs + .addArgument("--status") + .help( +"A short summary of the quorum status and the other provides detailed information about the status of replication.") + .action(storeTrue()) +val replicationArgs = describeParser.addArgumentGroup("Replication") +replicationArgs + .addArgument("--replication") + .help("Detailed information about the status of replication") + .action(storeTrue()) + } + + private def handleDescribeReplication(admin: Admin): Unit = { +val quorumInfo = admin.describeMetadataQuorum().quorumInfo().get() +val leaderId = quorumInfo.leaderId() +val leader = quorumInfo.voters().asScala.filter(_.replicaId() == leaderId).head +// Find proper columns width +var (maxReplicaIdLen, maxLogEndOffsetLen, maxLagLen, maxLastFetchTimeMsLen,
[GitHub] [kafka] C0urante commented on pull request #12410: MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest classes
C0urante commented on PR #12410: URL: https://github.com/apache/kafka/pull/12410#issuecomment-1215679995 The `ThreadedTest` class was intended to detect uncaught errors on other threads than the main thread used for the test. In many cases (including the `WorkerSinkTaskThreadedTest` suite), this was unnecessary since tests that inherited from it were all single-threaded. The only exceptions are: - With the `ExactlyOnceWorkerSourceTaskTest` and `WorkerSourceTaskTest` suites, we were spinning up at most one additional thread per test via `ExecutorService::submit`, and every time we did that, we were already making sure to invoke `get` on the resulting `Future` on the main testing thread, which caused any uncaught errors to fail the test - With the `DistributedHerderTest` suite, we were also spinning up at most one additional thread per test via `ExecutorService::submit`, but not checking the resulting `Future`. I've added a check in this PR I toyed with the idea of keeping the `ThreadedTest` class as a reusable utility, but I don't think it'd save us a lot of trouble without also potentially tying our hands and preventing us from doing things like ensuring that a certain thread or task dispatched on a separate thread has completed at a certain point in the test. The `ShutdownableThread` (and its test) are not used (effectively) anywhere in the code base and are completely safe to remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14165) ConfigTransformer.DEFAULT_PATTERN fails on Windows platform
[ https://issues.apache.org/jira/browse/KAFKA-14165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Andras updated KAFKA-14165: - Description: When a configuration Map which has an entry with a key "someKey" and a value "${file:/F:/properties.txt:fileKey}" is passed to the ConfigTransformer.transform(Map) method, an exception is thrown: 2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] (main) Could not read properties from file /F: java.nio.file.NoSuchFileException: \F at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) at java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:236) at java.base/java.nio.file.Files.newByteChannel(Files.java:380) at java.base/java.nio.file.Files.newByteChannel(Files.java:432) at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) at java.base/java.nio.file.Files.newInputStream(Files.java:160) at java.base/java.nio.file.Files.newBufferedReader(Files.java:2922) at java.base/java.nio.file.Files.newBufferedReader(Files.java:2955) at org.apache.kafka.common.config.provider.FileConfigProvider.reader(FileConfigProvider.java:104) at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:86) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) This happens because of a colon ":" in the file name, which is wrongfully matched by the ConfigTransformer.DEFAULT_PATTERN in these three groups "file", "/F" and "/properties.txt:fileKey" instead of "file", "/F:/properties.txt" and "fileKey". The ConfigTransformer.DEFAULT_PATTERN should be changed to match the first and the last colon ":" in the expression. was: When a configuration Map which has an entry with a key "someKey" and a value "${file:/F:/properties.txt:fileKey}" is passed to the ConfigTransformer.transform(Map) method, an exception is thrown: 2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] (main) Could not read properties from file /F: java.nio.file.NoSuchFileException: \F at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) at java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:236) at java.base/java.nio.file.Files.newByteChannel(Files.java:380) at java.base/java.nio.file.Files.newByteChannel(Files.java:432) at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) at java.base/java.nio.file.Files.newInputStream(Files.java:160) at java.base/java.nio.file.Files.newBufferedReader(Files.java:2922) at java.base/java.nio.file.Files.newBufferedReader(Files.java:2955) at org.apache.kafka.common.config.provider.FileConfigProvider.reader(FileConfigProvider.java:104) at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:86) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) This happens because of an colon ":" in the file name, which is wrongfully matched by the ConfigTransformer.DEFAULT_PATTERN in these three groups "file", "/F" and "/properties.txt:fileKey" instead of "file", "/F:/properties.txt" and "fileKey". The ConfigTransformer.DEFAULT_PATTERN should be changed to match the first and the last colon ":" in the expression. > ConfigTransformer.DEFAULT_PATTERN fails on Windows platform > --- > > Key: KAFKA-14165 > URL: https://issues.apache.org/jira/browse/KAFKA-14165 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 3.2.1 > Environment: Windows 10 OS >Reporter: Gabor Andras >Assignee: Prem Kamal >Priority: Major > > When a configuration Map which has an entry with a key "someKey" and a value > "${file:/F:/properties.txt:fileKey}" is passed to the > ConfigTransformer.transform(Map) method, an exception is thrown: > 2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] > (main) Could not read properties from file /F: > java.nio.file.NoSuchFileException: \F > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85) > at >
[GitHub] [kafka] C0urante merged pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors
C0urante merged PR #12450: URL: https://github.com/apache/kafka/pull/12450 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vincent81jiang opened a new pull request, #12516: Validate offsets of input records before writing them file
vincent81jiang opened a new pull request, #12516: URL: https://github.com/apache/kafka/pull/12516 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14154) Persistent URP after controller soft failure
[ https://issues.apache.org/jira/browse/KAFKA-14154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-14154. - Resolution: Fixed > Persistent URP after controller soft failure > > > Key: KAFKA-14154 > URL: https://issues.apache.org/jira/browse/KAFKA-14154 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > We ran into a scenario where a partition leader was unable to expand the ISR > after a soft controller failover. Here is what happened: > Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as > the current controller. > 1. Broker 1 loses its session in Zookeeper. > 2. Broker 2 becomes the new controller. > 3. During initialization, controller 2 removes 1 from the ISR. So state is > updated: leader=2, isr=[2], leader epoch=11. > 4. Broker 2 receives `LeaderAndIsr` from the new controller with leader > epoch=11. > 5. Broker 2 immediately tries to add replica 1 back to the ISR since it is > still fetching and is caught up. However, the > `BrokerToControllerChannelManager` is still pointed at controller 1, so that > is where the `AlterPartition` is sent. > 6. Controller 1 does not yet realize that it is not the controller, so it > processes the `AlterPartition` request. It sees the leader epoch of 11, which > is higher than what it has in its own context. Following changes to the > `AlterPartition` validation in > [https://github.com/apache/kafka/pull/12032/files,] the controller returns > FENCED_LEADER_EPOCH. > 7. After receiving the FENCED_LEADER_EPOCH from the old controller, the > leader is stuck because it assumes that the error implies that another > LeaderAndIsr request should be sent. > Prior to > [https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,], > the way we handled this case was a little different. We only verified that > the leader epoch in the request was at least as large as the current epoch in > the controller context. Anything higher was accepted. The controller would > have attempted to write the updated state to Zookeeper. This update would > have failed because of the controller epoch check, however, we would have > returned NOT_CONTROLLER in this case, which is handled in > `AlterPartitionManager`. > It is tempting to revert the logic, but the risk is in the idempotency check: > [https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.] > If the AlterPartition request happened to match the state inside the old > controller, the controller would consider the update successful and return no > error. But if its state was already stale at that point, then that might > cause the leader to incorrectly assume that the state had been updated. > One way to fix this problem without weakening the validation is to rely on > the controller epoch in `AlterPartitionManager`. When we discover a new > controller, we also discover its epoch, so we can pass that through. The > `LeaderAndIsr` request already includes the controller epoch of the > controller that sent it and we already propagate this through to > `AlterPartition.submit`. Hence all we need to do is verify that the epoch of > the current controller target is at least as large as the one discovered > through the `LeaderAndIsr`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #12459: KAFKA-13036: Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest
clolov commented on PR #12459: URL: https://github.com/apache/kafka/pull/12459#issuecomment-1215593952 Heya @cadonna and @dplavcic, Divij is taking some time off, so I will aim to provide an update to the comments and the pull request in the next couple of days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12514: KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epochs are ahead
hachikuji merged PR #12514: URL: https://github.com/apache/kafka/pull/12514 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12492: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12492: URL: https://github.com/apache/kafka/pull/12492#issuecomment-1215577425 Thank you @cadonna and @dplavcic for your comments. I will aim to address them as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12505: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12505: URL: https://github.com/apache/kafka/pull/12505#issuecomment-1215575745 Hey @dplavcic, thank you for the comments. I will aim to provide an answer in the next couple of days! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #12510: MINOR: Add setting input partitions for task mocks
ableegoldman merged PR #12510: URL: https://github.com/apache/kafka/pull/12510 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #12503: MINOR: Appending value to LIST config should not generate empty string with …
chia7712 merged PR #12503: URL: https://github.com/apache/kafka/pull/12503 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #12515: MINOR: Improve readability of `tryProcessAlterPartition`
ijuma opened a new pull request, #12515: URL: https://github.com/apache/kafka/pull/12515 After 520f72995d, the subsequent checks are ensuring that the leader and partition epochs are not less than. So, make that explicit. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14166) Consistent toString implementations for byte arrays in generated messages
Jason Gustafson created KAFKA-14166: --- Summary: Consistent toString implementations for byte arrays in generated messages Key: KAFKA-14166 URL: https://issues.apache.org/jira/browse/KAFKA-14166 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson In the generated `toString()` implementations for message objects (such as protocol RPCs), we are a little inconsistent in how we display types with raw bytes. If the type is `Array[Byte]`, then we use Arrays.toString. If the type is `ByteBuffer` (i.e. when `zeroCopy` is set), then we use the corresponding `ByteBuffer.toString`, which is not often useful. We should try to be consistent. By default, it is probably not useful to print the full array contents, but we might print summary information (e.g. size, checksum). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji opened a new pull request, #12514: KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epochs are ahead
hachikuji opened a new pull request, #12514: URL: https://github.com/apache/kafka/pull/12514 Similar to https://github.com/apache/kafka/pull/12506. For the Kraft controller, we should return NOT_CONTROLLER if the leader/partition epoch in the request is ahead of the controller. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14165) ConfigTransformer.DEFAULT_PATTERN fails on Windows platform
[ https://issues.apache.org/jira/browse/KAFKA-14165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prem Kamal reassigned KAFKA-14165: -- Assignee: Prem Kamal > ConfigTransformer.DEFAULT_PATTERN fails on Windows platform > --- > > Key: KAFKA-14165 > URL: https://issues.apache.org/jira/browse/KAFKA-14165 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 3.2.1 > Environment: Windows 10 OS >Reporter: Gabor Andras >Assignee: Prem Kamal >Priority: Major > > When a configuration Map which has an entry with a key "someKey" and a value > "${file:/F:/properties.txt:fileKey}" is passed to the > ConfigTransformer.transform(Map) method, an exception is thrown: > 2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] > (main) Could not read properties from file /F: > java.nio.file.NoSuchFileException: \F > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:236) > at java.base/java.nio.file.Files.newByteChannel(Files.java:380) > at java.base/java.nio.file.Files.newByteChannel(Files.java:432) > at > java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) > at java.base/java.nio.file.Files.newInputStream(Files.java:160) > at java.base/java.nio.file.Files.newBufferedReader(Files.java:2922) > at java.base/java.nio.file.Files.newBufferedReader(Files.java:2955) > at > org.apache.kafka.common.config.provider.FileConfigProvider.reader(FileConfigProvider.java:104) > at > org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:86) > at > org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) > This happens because of an colon ":" in the file name, which is wrongfully > matched by the ConfigTransformer.DEFAULT_PATTERN in these three groups > "file", "/F" and "/properties.txt:fileKey" instead of "file", > "/F:/properties.txt" and "fileKey". > The ConfigTransformer.DEFAULT_PATTERN should be changed to match the first > and the last colon ":" in the expression. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14051) KRaft remote controllers do not create metrics reporters
[ https://issues.apache.org/jira/browse/KAFKA-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino resolved KAFKA-14051. --- Resolution: Fixed > KRaft remote controllers do not create metrics reporters > > > Key: KAFKA-14051 > URL: https://issues.apache.org/jira/browse/KAFKA-14051 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > > KRaft remote controllers (KRaft nodes with the configuration value > process.roles=controller) do not create the configured metrics reporters > defined by the configuration key metric.reporters. The reason is because > KRaft remote controllers are not wired up for dynamic config changes, and the > creation of the configured metric reporters actually happens during the > wiring up of the broker for dynamic reconfiguration, in the invocation of > DynamicBrokerConfig.addReconfigurables(KafkaBroker). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13559) The broker's ProduceResponse may be delayed for 300ms
[ https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-13559. Fix Version/s: 3.4.0 Reviewer: Rajini Sivaram Resolution: Fixed > The broker's ProduceResponse may be delayed for 300ms > -- > > Key: KAFKA-13559 > URL: https://issues.apache.org/jira/browse/KAFKA-13559 > Project: Kafka > Issue Type: Task > Components: core >Affects Versions: 2.7.0 >Reporter: frankshi >Assignee: Badai Aqrandista >Priority: Major > Fix For: 3.4.0 > > Attachments: image-1.png, image-2.png, > image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png, > image-3.png, image-5.png, image-6.png, image-7.png, image.png > > > Hi team: > We have found the value in the source code > [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922] > may lead broker’s ProduceResponse to be delayed for 300ms. > * Server-version: 2.13-2.7.0. > * Client-version: confluent-kafka-python-1.5.0. > we have set the client’s configuration as following: > {code:java} > ling.ms = 0 > acks = 1 > delivery.timeout.ms = 100 > request.timeout.ms = 80 > Sasl.mechanism = “PLAIN” > Security.protocol = “SASL_SSL” > .. > {code} > Because we set ACKs = 1, the client sends ProduceRequests and receives > ProduceResponses from brokers. The leader broker doesn't need to wait for the > ISR’s writing data to disk successfully. It can reply to the client by > sending ProduceResponses directly. In our situation, the ping value between > the client and the kafka brokers is about ~10ms, and most of the time, the > responses are received about 10ms after the Produce requests are sent. But > sometimes the responses are received about ~300ms later. > The following shows the log from the client. > {code:java} > 2021-11-26 02:31:30,567 Sent partial ProduceRequest (v7, 0+16527/37366 > bytes, CorrId 2753) > 2021-11-26 02:31:30,568 Sent partial ProduceRequest (v7, 16527+16384/37366 > bytes, CorrId 2753) > 2021-11-26 02:31:30,568 Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId > 2753) > 2021-11-26 02:31:30,570 Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754) > 2021-11-26 02:31:30,571 Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755) > 2021-11-26 02:31:30,572 Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756) > 2021-11-26 02:31:30,572 Received ProduceResponse (v7, 69 bytes, CorrId 2751, > rtt 9.79ms) > 2021-11-26 02:31:30,572 Received ProduceResponse (v7, 69 bytes, CorrId 2752, > rtt 10.34ms) > 2021-11-26 02:31:30,573 Received ProduceResponse (v7, 69 bytes, CorrId 2753, > rtt 10.11ms) > 2021-11-26 02:31:30,872 Received ProduceResponse (v7, 69 bytes, CorrId 2754, > rtt 309.69ms) > 2021-11-26 02:31:30,883 Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757) > 2021-11-26 02:31:30,887 Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758) > 2021-11-26 02:31:30,888 Received ProduceResponse (v7, 69 bytes, CorrId 2755, > rtt 318.85ms) > 2021-11-26 02:31:30,893 Sent partial ProduceRequest (v7, 0+16527/37562 > bytes, CorrId 2759) > 2021-11-26 02:31:30,894 Sent partial ProduceRequest (v7, 16527+16384/37562 > bytes, CorrId 2759) > 2021-11-26 02:31:30,895 Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId > 2759) > 2021-11-26 02:31:30,896 Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760) > 2021-11-26 02:31:30,897 Received ProduceResponse (v7, 69 bytes, CorrId 2756, > rtt 317.74ms) > 2021-11-26 02:31:30,897 Received ProduceResponse (v7, 69 bytes, CorrId 2757, > rtt 4.22ms) > 2021-11-26 02:31:30,899 Received ProduceResponse (v7, 69 bytes, CorrId 2758, > rtt 2.61ms){code} > > The requests of CorrId 2753 and 2754 are almost sent at the same time, but > the Response of 2754 is delayed for ~300ms. > We checked the logs on the broker. > > {code:java} > [2021-11-26 02:31:30,873] DEBUG Completed > request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=rdkafka, > correlationId=2754) – {acks=1,timeout=80,numPartitions=1},response: > {responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]} > ],throttle_time_ms=0} from connection > 10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python, > softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger) > {code} > > > It seems that the time cost on the server side is very small. What’s the > reason for the latency spikes? > We also did tcpdump at the server side and
[GitHub] [kafka] rajinisivaram merged pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
rajinisivaram merged PR #12416: URL: https://github.com/apache/kafka/pull/12416 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
rajinisivaram commented on PR #12416: URL: https://github.com/apache/kafka/pull/12416#issuecomment-1214907290 Test failures not related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13887) Running multiple instance of same stateful KafkaStreams application on single host raise Exception
[ https://issues.apache.org/jira/browse/KAFKA-13887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579602#comment-17579602 ] Bruno Cadonna commented on KAFKA-13887: --- bq. Meaning: no local store, no stateful processing that is backed on disk. Is this true? No, you can have an attached storage volume. In k8s you need to use stateful sets to assign the the same storage volume to the same pod across restarts. bq. The problem is state.dir gets associated more or less randomly to the application instance. But what has to be ensured is the partition assignment for the given application instance fits to the state. This mean we cannot randomly link an application instance with its current partition assignment to a random state store... The state store directory needs to be there before the assignment. Streams is not designed to allow to change the state store directory on the fly. bq. if I got it correct, the sate gets transferred from instance to instance in case of partition reassignment. But in this setup I can't imagine this can work Any input on that? The state is transferred through the changelog topics on the Kafka brokers. The setup of the nodes do not matter, except that there needs to be some sort of persistent volume to write to per Streams instance if you use persistent state store. If you use in-memory state stores you do not need the a persistent volume. > Running multiple instance of same stateful KafkaStreams application on single > host raise Exception > -- > > Key: KAFKA-13887 > URL: https://issues.apache.org/jira/browse/KAFKA-13887 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.0 >Reporter: Sina Askarnejad >Priority: Minor > > KAFKA-10716 locks the state store directory on the running host, as it stores > the processId in a *kafka-streams-process-metadata* file in this path. As a > result to run multiple instances of the same application on a single host > each instance must run with different *state.dir* config, otherwise the > following exception will be raised for the second instance: > > Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: > Unable to initialize state, this can happen if multiple instances of Kafka > Streams are running in the same state directory > at > org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:191) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:868) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:851) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:821) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:733) > > The easiest solution multi-threading. Running single instance with multiple > threads, but the multi-threading programming is not suitable for all > scenarios. e.g., when the tasks are CPU intensive, or in large scale > scenarios, or fully utilizing multi core CPUS. > > The second solution is multi-processing. This solution on a single host needs > extra work and advisor, as each instance needs to be run with different > {*}state.dir{*}. It is a good enhancement if kafkaStreams could handle this > config for multi instance. > > The proposed solution is that the KafkaStreams use the > */\{state.dir}/\{application.id}/\{ordinal.number}* path instead of > */\{state.dir}/\{application.id}* to store the meta file and states. The > *ordinal.number* starts with 0 and is incremental. > When an instance starts it checks the ordinal.number directories start by 0 > and finds the first subdirectory that is not locked and use that for its > state directory, this way all the tasks assigns correctly on rebalance and > multiple instance can be run on single host. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12459: KAFKA-13036: Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest
cadonna commented on code in PR #12459: URL: https://github.com/apache/kafka/pull/12459#discussion_r945525699 ## streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java: ## @@ -279,449 +310,335 @@ public void shouldThrowIfDbToAddWasAlreadyAddedForOtherSegment() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); final Throwable exception = assertThrows( -IllegalStateException.class, -() -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd2, statisticsToAdd2) +IllegalStateException.class, +() -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd2, statisticsToAdd2) ); assertThat( -exception.getMessage(), -is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 + -" was already added for another segment as a value provider. This is a bug in Kafka Streams. " + -"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues;) +exception.getMessage(), +is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 + +" was already added for another segment as a value provider. This is a bug in Kafka Streams. " + +"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues;) ); } @Test public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedToNewlyCreatedRecorder() { -recordingTrigger.addMetricsRecorder(recorder); -replay(recordingTrigger); - recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); -verify(recordingTrigger); +verify(recordingTrigger).addMetricsRecorder(recorder); } @Test public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfterLastValueProvidersWereRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); -reset(recordingTrigger); -recordingTrigger.addMetricsRecorder(recorder); -replay(recordingTrigger); + +Mockito.reset(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); -verify(recordingTrigger); +verify(recordingTrigger).addMetricsRecorder(recorder); } @Test public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); -reset(recordingTrigger); -replay(recordingTrigger); + +verify(recordingTrigger).addMetricsRecorder(recorder); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); -verify(recordingTrigger); +verifyNoMoreInteractions(recordingTrigger); } @Test public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); -reset(statisticsToAdd1); -statisticsToAdd1.close(); -replay(statisticsToAdd1); - recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - -verify(statisticsToAdd1); +verify(statisticsToAdd1).close(); } @Test public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); -reset(statisticsToAdd1); -replay(statisticsToAdd1); - recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - -verify(statisticsToAdd1); +verify(statisticsToAdd1, never()).close(); } @Test public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); -reset(recordingTrigger); -replay(recordingTrigger); + +Mockito.reset(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); -verify(recordingTrigger); +verify(recordingTrigger, never()).removeMetricsRecorder(recorder); -reset(recordingTrigger); -recordingTrigger.removeMetricsRecorder(recorder); -replay(recordingTrigger); +Mockito.reset(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_2); -verify(recordingTrigger); +verify(recordingTrigger).removeMetricsRecorder(recorder);