[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-08-15 Thread GitBox


ashmeet13 commented on PR #12414:
URL: https://github.com/apache/kafka/pull/12414#issuecomment-1216159486

   Hi @dengziming @jsancio, can you help with a review on this PR. If it looks 
okay to merge I can clean up the PR and remove temporary comments added by me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12519: [WIP] KAFKA-10199: Handle exceptions from state updater

2022-08-15 Thread GitBox


guozhangwang commented on PR #12519:
URL: https://github.com/apache/kafka/pull/12519#issuecomment-1216080732

   cc @cadonna would like to hear your comments before I continue adding unit 
tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on PR #12466:
URL: https://github.com/apache/kafka/pull/12466#issuecomment-1216080804

   Merged to trunk, @cadonna please feel free to leave more comments, and I 
will address in https://github.com/apache/kafka/pull/12519


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #12519: [WIP] KAFKA-10199: Handle exceptions from state updater

2022-08-15 Thread GitBox


guozhangwang opened a new pull request, #12519:
URL: https://github.com/apache/kafka/pull/12519

   1. In state updater, when handling task corrupted exception due to invalid 
restoring offset, first delete the affected partitions from the checkpoint 
before reporting it back to the stream thread. This is to mimic the same 
behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. 
It's cleaner to do so inside the restore thread, plus it enables us to optimize 
by only deleting those corrupted partitions, and not all.
   2. In the state manager, handle the drained exceptions as follows (this is 
the same as handling all exceptions from `handleAssignment`): 1) Task-migrated, 
throw all the way to stream-thread as `handleTaskMigrated`, 2) any fatal 
Streams exception, throw all the way to stream-thread to trigger exception 
handler, 3) Task-corrupted, throw to the stream-thread as `handleCorruption`. 
Note that for 3), we would specially distinguish if the corrupted-tasks are 
already closed (when they are thrown from `handleAssignment` or not (when they 
are thrown from the state updater).
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang merged PR #12466:
URL: https://github.com/apache/kafka/pull/12466


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12518: KAFKA-14167; Completion exceptions should not be translated directly to error codes

2022-08-15 Thread GitBox


hachikuji opened a new pull request, #12518:
URL: https://github.com/apache/kafka/pull/12518

   There are a few cases in `ControllerApis` where we may see an `ApiException` 
wrapped as a `CompletionException`. This can happen in 
`QuorumController.allocateProducerIds` where the returned future is the result 
of calling `thenApply` on the future passed to the controller. The danger when 
this happens is that the `CompletionException` gets passed to 
`Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a 
minimum, I found that the `AllocateProducerIds` and `UpdateFeatures` APIs were 
affected by this bug, but it is difficult to root out all cases. 
   
   Interestingly, `DeleteTopics` was not affected as I originally suspected. 
This is because we have logic in `ApiError.fromThrowable` to check for both 
`CompletionException` and `ExecutionException` and to pull out the underlying 
cause. This patch moves this logic out of `ApiError.fromThrowable` and into 
`Errors.forException` to be sure that we handle all cases where exceptions are 
converted to error codes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14167) Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller

2022-08-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14167:

Priority: Blocker  (was: Major)

> Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller
> 
>
> Key: KAFKA-14167
> URL: https://issues.apache.org/jira/browse/KAFKA-14167
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> In `ControllerApis`, we have callbacks such as the following after completion:
> {code:java}
>     controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
>       .handle[Unit] { (results, exception) =>
>         if (exception != null) {
>           requestHelper.handleError(request, exception)
>         } else {
>           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs 
> => {
>             results.setThrottleTimeMs(requestThrottleMs)
>             new AllocateProducerIdsResponse(results)
>           })
>         }
>       } {code}
> What I see locally is that the underlying exception that gets passed to 
> `handle` always gets wrapped in a `CompletionException`. When passed to 
> `getErrorResponse`, this error will get converted to `UNKNOWN_SERVER_ERROR`. 
> For example, in this case, a `NOT_CONTROLLER` error returned from the 
> controller would be returned as `UNKNOWN_SERVER_ERROR`. It looks like there 
> are a few APIs that are potentially affected by this bug, such as 
> `DeleteTopics` and `UpdateFeatures`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14167) Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller

2022-08-15 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14167:
---

 Summary: Unexpected UNKNOWN_SERVER_ERROR raised from kraft 
controller
 Key: KAFKA-14167
 URL: https://issues.apache.org/jira/browse/KAFKA-14167
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


In `ControllerApis`, we have callbacks such as the following after completion:
{code:java}
    controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
      .handle[Unit] { (results, exception) =>
        if (exception != null) {
          requestHelper.handleError(request, exception)
        } else {
          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
{
            results.setThrottleTimeMs(requestThrottleMs)
            new AllocateProducerIdsResponse(results)
          })
        }
      } {code}
What I see locally is that the underlying exception that gets passed to 
`handle` always gets wrapped in a `CompletionException`. When passed to 
`getErrorResponse`, this error will get converted to `UNKNOWN_SERVER_ERROR`. 
For example, in this case, a `NOT_CONTROLLER` error returned from the 
controller would be returned as `UNKNOWN_SERVER_ERROR`. It looks like there are 
a few APIs that are potentially affected by this bug, such as `DeleteTopics` 
and `UpdateFeatures`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14167) Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller

2022-08-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14167:

Fix Version/s: 3.3.0

> Unexpected UNKNOWN_SERVER_ERROR raised from kraft controller
> 
>
> Key: KAFKA-14167
> URL: https://issues.apache.org/jira/browse/KAFKA-14167
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
> Fix For: 3.3.0
>
>
> In `ControllerApis`, we have callbacks such as the following after completion:
> {code:java}
>     controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
>       .handle[Unit] { (results, exception) =>
>         if (exception != null) {
>           requestHelper.handleError(request, exception)
>         } else {
>           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs 
> => {
>             results.setThrottleTimeMs(requestThrottleMs)
>             new AllocateProducerIdsResponse(results)
>           })
>         }
>       } {code}
> What I see locally is that the underlying exception that gets passed to 
> `handle` always gets wrapped in a `CompletionException`. When passed to 
> `getErrorResponse`, this error will get converted to `UNKNOWN_SERVER_ERROR`. 
> For example, in this case, a `NOT_CONTROLLER` error returned from the 
> controller would be returned as `UNKNOWN_SERVER_ERROR`. It looks like there 
> are a few APIs that are potentially affected by this bug, such as 
> `DeleteTopics` and `UpdateFeatures`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14039) Fix KRaft AlterConfigPolicy usage

2022-08-15 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14039.
--
Resolution: Fixed

> Fix KRaft AlterConfigPolicy usage
> -
>
> Key: KAFKA-14039
> URL: https://issues.apache.org/jira/browse/KAFKA-14039
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.3.0, 3.4.0
>
>
> In ConfigurationControlManager, we are currently passing all the 
> configuration values known to the controller down into the AlterConfigPolicy. 
> This does not match the behavior in ZK mode where we only pass configs which 
> were included in the alter configs request.
> This can lead to different unexpected behavior in custom AlterConfigPolicy 
> implementations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946203610


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void shouldHandleRemovedTasksFromStateUpdater() {

Review Comment:
   When adding tests I realized we do not need to handle exceptions by closing 
tasks dirty and then re-thrown, but could simply re-throw directly and delegate 
to the StreamThread to handle the thrown exception. Will updated accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #12515: MINOR: Improve readability of `tryProcessAlterPartition`

2022-08-15 Thread GitBox


ijuma merged PR #12515:
URL: https://github.com/apache/kafka/pull/12515


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-08-15 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13809.
---
Fix Version/s: 3.4.0
   Resolution: Fixed

> FileStreamSinkConnector and FileStreamSourceConnector should propagate full 
> configuration to tasks
> --
>
> Key: KAFKA-13809
> URL: https://issues.apache.org/jira/browse/KAFKA-13809
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.4.0
>
>
> The 2 example connectors do not propagate the full connector configuration to 
> the tasks. This makes it impossible to override built-in configs, such as 
> producer/consumer overrides.
> This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-08-15 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-13809:
-

Assignee: Yash Mayya

> FileStreamSinkConnector and FileStreamSourceConnector should propagate full 
> configuration to tasks
> --
>
> Key: KAFKA-13809
> URL: https://issues.apache.org/jira/browse/KAFKA-13809
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Assignee: Yash Mayya
>Priority: Major
>
> The 2 example connectors do not propagate the full connector configuration to 
> the tasks. This makes it impossible to override built-in configs, such as 
> producer/consumer overrides.
> This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-08-15 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-13809:
---

> FileStreamSinkConnector and FileStreamSourceConnector should propagate full 
> configuration to tasks
> --
>
> Key: KAFKA-13809
> URL: https://issues.apache.org/jira/browse/KAFKA-13809
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Assignee: Yash Mayya
>Priority: Major
>
> The 2 example connectors do not propagate the full connector configuration to 
> the tasks. This makes it impossible to override built-in configs, such as 
> producer/consumer overrides.
> This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji opened a new pull request, #12517: KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader

2022-08-15 Thread GitBox


hachikuji opened a new pull request, #12517:
URL: https://github.com/apache/kafka/pull/12517

   Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` 
request is sent to a node that is not the current leader. In addition to being 
inconsistent with all of the other leader APIs, this error is treated as fatal 
by both the forwarding manager and the admin client. Instead, we should return 
`NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable 
and we rely on the admin client to retry it after seeing this error.
   
   Note that we could also check for this error in `ForwardingManager` and 
retry the request from that context, but the current forwarding logic is only 
setup to handle retries for requests sent to the controller, which is logically 
distinct from the raft leader. The `DescribeQuorum` is a bit of an oddball from 
an API perspective because it is a multi-partition request, but it is always 
forwarded to the controller since there is only one raft partition. So to 
handle this case specially, we would have to unwrap the envelope and check the 
error for the __cluster_metadata partition. It seemed simpler to rely on the 
admin client to retry.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946168656


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void shouldHandleRemovedTasksFromStateUpdater() {

Review Comment:
   This test did verify that `initializeIfNeeded` is called, and the tasks are 
added back to the state updater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946159951


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void shouldHandleRemovedTasksFromStateUpdater() {
+// tasks to recycle
+final StreamTask task00 = mock(StreamTask.class);
+final StandbyTask task01 = mock(StandbyTask.class);
+final StandbyTask task00Converted = mock(StandbyTask.class);
+final StreamTask task01Converted = mock(StreamTask.class);
+// task to close
+final StreamTask task02 = mock(StreamTask.class);
+// task to update inputs
+final StreamTask task03 = mock(StreamTask.class);
+when(task00.id()).thenReturn(taskId00);
+when(task01.id()).thenReturn(taskId01);
+when(task02.id()).thenReturn(taskId02);
+when(task03.id()).thenReturn(taskId03);
+when(task00.inputPartitions()).thenReturn(taskId00Partitions);
+when(task01.inputPartitions()).thenReturn(taskId01Partitions);
+when(task02.inputPartitions()).thenReturn(taskId02Partitions);
+when(task03.inputPartitions()).thenReturn(taskId03Partitions);
+when(task00.isActive()).thenReturn(true);
+when(task01.isActive()).thenReturn(false);
+when(task02.isActive()).thenReturn(true);
+when(task03.isActive()).thenReturn(true);
+when(task00.state()).thenReturn(State.RESTORING);
+when(task01.state()).thenReturn(State.RUNNING);
+when(task02.state()).thenReturn(State.RESTORING);
+when(task03.state()).thenReturn(State.RESTORING);

Review Comment:
   Ack. Rebased on your utils and break it down to multiple tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946157896


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -43,35 +43,48 @@
 class Tasks {
 private final Logger log;
 
+// TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
 private final Map activeTasksPerId = new TreeMap<>();
 private final Map standbyTasksPerId = new TreeMap<>();
 
 // Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
 // these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
 // we receive a new assignment and they are revoked from the thread.
-
-// Tasks may have been assigned but not yet created because:
-// 1. They are for a NamedTopology that is yet known by this host.
-// 2. They are to be recycled from an existing restoring task yet to be 
returned from the state updater.
-//
-// When that occurs we stash these pending tasks until either they are 
finally clear to be created,
-// or they are revoked from a new assignment.
 private final Map> pendingActiveTasksToCreate 
= new HashMap<>();
 private final Map> pendingStandbyTasksToCreate 
= new HashMap<>();
 private final Map> pendingTasksToRecycle = new 
HashMap<>();
 private final Map> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
 private final Set pendingTasksToInit = new HashSet<>();
 private final Set pendingTasksToClose = new HashSet<>();
 
+// TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
 private final Map activeTasksPerPartition = new 
HashMap<>();
 
 Tasks(final LogContext logContext) {
 this.log = logContext.logger(getClass());
 }
 
-void purgePendingTasksToCreate(final Set assignedActiveTasks, 
final Set assignedStandbyTasks) {
-pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks);
-pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks);
+void purgePendingTasksToCreate() {
+pendingActiveTasksToCreate.clear();
+pendingStandbyTasksToCreate.clear();
+}
+
+Map> 
drainPendingActiveTasksForTopologies(final Set currentTopologies) {

Review Comment:
   Ack.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -43,35 +43,48 @@
 class Tasks {
 private final Logger log;
 
+// TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
 private final Map activeTasksPerId = new TreeMap<>();
 private final Map standbyTasksPerId = new TreeMap<>();
 
 // Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
 // these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
 // we receive a new assignment and they are revoked from the thread.
-
-// Tasks may have been assigned but not yet created because:
-// 1. They are for a NamedTopology that is yet known by this host.
-// 2. They are to be recycled from an existing restoring task yet to be 
returned from the state updater.
-//
-// When that occurs we stash these pending tasks until either they are 
finally clear to be created,
-// or they are revoked from a new assignment.
 private final Map> pendingActiveTasksToCreate 
= new HashMap<>();
 private final Map> pendingStandbyTasksToCreate 
= new HashMap<>();
 private final Map> pendingTasksToRecycle = new 
HashMap<>();
 private final Map> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
 private final Set pendingTasksToInit = new HashSet<>();
 private final Set pendingTasksToClose = new HashSet<>();
 
+// TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
 private final Map activeTasksPerPartition = new 
HashMap<>();
 
 Tasks(final LogContext logContext) {
 this.log = logContext.logger(getClass());
 }
 
-void purgePendingTasksToCreate(final Set assignedActiveTasks, 
final Set assignedStandbyTasks) {
-pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks);
-pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks);
+void purgePendingTasksToCreate() {
+pendingActiveTasksToCreate.clear();
+pendingStandbyTasksToCreate.clear();
+}
+
+Map> 
drainPendingActiveTasksForTopologies(final Set currentTopologies) {
+final Map> pendingActiveTasksForTopologies 
=
+filterMap(pendingActiveTasksToCreate, t -> 
currentTopologies.contains(t.getKey().topologyName()));
+
+
pendingActiveTasksToCreate.keySet().removeAll(pendingActiveTasksForTopologies.keySet());
+
+return 

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946148995


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -296,6 +296,75 @@ public void shouldAddTasksToStateUpdater() {
 Mockito.verify(stateUpdater).add(task01);
 }
 
+@Test
+public void shouldHandleRemovedTasksFromStateUpdater() {
+// tasks to recycle
+final StreamTask task00 = mock(StreamTask.class);
+final StandbyTask task01 = mock(StandbyTask.class);
+final StandbyTask task00Converted = mock(StandbyTask.class);
+final StreamTask task01Converted = mock(StreamTask.class);
+// task to close
+final StreamTask task02 = mock(StreamTask.class);
+// task to update inputs
+final StreamTask task03 = mock(StreamTask.class);
+when(task00.id()).thenReturn(taskId00);
+when(task01.id()).thenReturn(taskId01);
+when(task02.id()).thenReturn(taskId02);
+when(task03.id()).thenReturn(taskId03);
+when(task00.inputPartitions()).thenReturn(taskId00Partitions);
+when(task01.inputPartitions()).thenReturn(taskId01Partitions);
+when(task02.inputPartitions()).thenReturn(taskId02Partitions);
+when(task03.inputPartitions()).thenReturn(taskId03Partitions);
+when(task00.isActive()).thenReturn(true);
+when(task01.isActive()).thenReturn(false);
+when(task02.isActive()).thenReturn(true);
+when(task03.isActive()).thenReturn(true);
+when(task00.state()).thenReturn(State.RESTORING);
+when(task01.state()).thenReturn(State.RUNNING);
+when(task02.state()).thenReturn(State.RESTORING);
+when(task03.state()).thenReturn(State.RESTORING);
+when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, 
task01, task02, task03));
+
+expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
+.andStubReturn(task01Converted);
+activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
+expectLastCall().times(2);
+expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), 
eq(taskId00Partitions)))
+.andStubReturn(task00Converted);
+expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+consumer.resume(anyObject());
+expectLastCall().anyTimes();
+replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer);

Review Comment:
   Yes.. I tried to use Mockito for all and soon realized it would incur a much 
larger scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946147457


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -316,20 +313,22 @@ public void handleAssignment(final Map> activeTasks,
 classifyTasksWithStateUpdater(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
 }
 
-tasks.addPendingActiveTasks(pendingTasksToCreate(activeTasksToCreate));
-
tasks.addPendingStandbyTasks(pendingTasksToCreate(standbyTasksToCreate));
+tasks.purgePendingTasksToCreate();
+
tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(activeTasksToCreate));
+
tasks.addPendingStandbyTasksToCreate(pendingTasksToCreate(standbyTasksToCreate));

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12466: KAFKA-10199: Handle task closure and recycling from state updater

2022-08-15 Thread GitBox


guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r946143073


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -656,6 +657,71 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
 return allRunning;
 }
 
+private void addTaskstoStateUpdater() {
+for (final Task task : tasks.drainPendingTaskToInit()) {
+task.initializeIfNeeded();
+stateUpdater.add(task);
+}
+}
+
+private void handleRemovedTasksFromStateUpdater() {
+final Map taskExceptions = new 
LinkedHashMap<>();
+final Set tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
+
+for (final Task task : stateUpdater.drainRemovedTasks()) {
+final TaskId taskId = task.id();
+Set inputPartitions;
+if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
+try {
+final Task newTask = task.isActive() ?
+convertActiveToStandby((StreamTask) task, 
inputPartitions) :
+convertStandbyToActive((StandbyTask) task, 
inputPartitions);
+newTask.initializeIfNeeded();
+stateUpdater.add(newTask);
+} catch (final RuntimeException e) {
+final String uncleanMessage = String.format("Failed to 
recycle task %s cleanly. " +
+"Attempting to close remaining tasks before 
re-throwing:", taskId);
+log.error(uncleanMessage, e);
+
+if (task.state() != State.CLOSED) {
+tasksToCloseDirty.add(task);
+}
+
+taskExceptions.putIfAbsent(taskId, e);
+}
+} else if (tasks.removePendingTaskToClose(task.id())) {
+try {
+task.closeClean();

Review Comment:
   You're right! Added and also updated unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-12699) Streams no longer overrides the java default uncaught exception handler

2022-08-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12699:
---
Fix Version/s: 3.4.0
   (was: 3.3.0)

> Streams no longer overrides the java default uncaught exception handler  
> -
>
> Key: KAFKA-12699
> URL: https://issues.apache.org/jira/browse/KAFKA-12699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Minor
> Fix For: 3.4.0
>
>
> If a user used `Thread.setUncaughtExceptionHanlder()` to set the handler for 
> all threads in the runtime streams would override that with its own handler. 
> However since streams does not use the `Thread` handler anymore it will no 
> longer do so. This can cause problems if the user does something like 
> `System.exit(1)` in the handler. 
>  
> If using the old handler in streams it will still work as it used to



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12508: KAFKA-13888: Addition of Information in DescribeQuorumResponse

2022-08-15 Thread GitBox


hachikuji commented on code in PR #12508:
URL: https://github.com/apache/kafka/pull/12508#discussion_r946081969


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -323,19 +372,30 @@ private static class ReplicaState implements 
Comparable {
 final int nodeId;
 Optional endOffset;
 OptionalLong lastFetchTimestamp;
+OptionalLong lastfetchLeaderLogEndOffset;

Review Comment:
   nit: should be `lastFetchLeaderLogEndOffset`



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -778,4 +779,52 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+  def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): 
Admin = {
+var props: Properties = null
+props = cluster.clientProperties()
+props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+Admin.create(props)
+  }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(4).
+setNumControllerNodes(3).build()).build()
+try {
+  cluster.format
+  cluster.startup
+  for (i <- 0 to 3) {
+TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+  "Broker Never started up")
+  }
+  val admin = createAdminClient(cluster, false)
+  try {
+val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+val quorumInfo = quorumState.quorumInfo.get()
+
+assertEquals(0, quorumInfo.observers.size)

Review Comment:
   This probably fails now that brokers send their replicaId.



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -778,4 +779,52 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+  def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): 
Admin = {
+var props: Properties = null
+props = cluster.clientProperties()
+props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+Admin.create(props)
+  }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(4).
+setNumControllerNodes(3).build()).build()
+try {
+  cluster.format
+  cluster.startup
+  for (i <- 0 to 3) {
+TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+  "Broker Never started up")
+  }
+  val admin = createAdminClient(cluster, false)
+  try {
+val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+val quorumInfo = quorumState.quorumInfo.get()
+
+assertEquals(0, quorumInfo.observers.size)
+assertEquals(3, quorumInfo.voters.size)

Review Comment:
   A more straightforward way to assert the voter/observer sets is in 
`DescribeQuorumRequestTest`:
   ```scala
 val voterData = partitionData.currentVoters.asScala
 assertEquals(cluster.controllerIds().asScala, 
voterData.map(_.replicaId).toSet);
   
 val observerData = partitionData.observers.asScala
 assertEquals(cluster.brokerIds().asScala, 
observerData.map(_.replicaId).toSet);
   ```



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1179,8 +1179,12 @@ private DescribeQuorumResponseData 
handleDescribeQuorumRequest(
 leaderState.localId(),
 leaderState.epoch(),
 leaderState.highWatermark().isPresent() ? 
leaderState.highWatermark().get().offset : -1,
-convertToReplicaStates(leaderState.getVoterEndOffsets()),
-
convertToReplicaStates(leaderState.getObserverStates(currentTimeMs))
+convertToReplicaStates(leaderState.getVoterEndOffsets(),
+leaderState.getVoterLastFetchTimes(),

Review Comment:
   I think it would be a bit simpler to push creation of the `ReplicaState` 
object into `LeaderState`:
   
   ```java
   class LeaderState {
 ...
   
 List voterStates(long currentTimeMs);
 List observerStates(long currentTimeMs);
   }
   ```
   Then we wouldn't need all the boilerplate methods to create all the 
different Map variations. Also, then the sanity checks in 
`convertToReplicaStates` become unnecessary.



##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -217,19 +218,30 @@ public boolean updateLocalState(long fetchTimestamp, 
LogOffsetMetadata logOffset
  * @param replicaId replica id
  * @param fetchTimestamp fetch timestamp
  * @param logOffsetMetadata new log offset and metadata
+ * @param leaderLogEndOffset current log end offset of the leader
  * @return true if the high watermark is updated too
  */
-public boolean 

[jira] [Assigned] (KAFKA-13940) DescribeQuorum returns INVALID_REQUEST if not handled by leader

2022-08-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13940:
---

Assignee: Jason Gustafson

> DescribeQuorum returns INVALID_REQUEST if not handled by leader
> ---
>
> Key: KAFKA-13940
> URL: https://issues.apache.org/jira/browse/KAFKA-13940
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> In `KafkaRaftClient.handleDescribeQuorum`, we currently return 
> INVALID_REQUEST if the node is not the current raft leader. This is 
> surprising and doesn't work with our general approach for retrying forwarded 
> APIs. In `BrokerToControllerChannelManager`, we only retry after 
> `NOT_CONTROLLER` errors. It would be more consistent with the other Raft APIs 
> if we returned NOT_LEADER_OR_FOLLOWER, but that also means we need additional 
> logic in `BrokerToControllerChannelManager` to handle that error and retry 
> correctly. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh

2022-08-15 Thread GitBox


hachikuji commented on code in PR #12469:
URL: https://github.com/apache/kafka/pull/12469#discussion_r946063842


##
core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala:
##
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.tools.TerseFailure
+import kafka.utils.Exit
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue}
+import net.sourceforge.argparse4j.inf.Subparsers
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.admin.{Admin, QuorumInfo}
+import org.apache.kafka.common.utils.Utils
+
+import java.io.File
+import java.util.Properties
+import scala.jdk.CollectionConverters._
+
+/**
+ * A tool for describing quorum status
+ */
+object MetadataQuorumCommand {
+
+  def main(args: Array[String]): Unit = {
+val res = mainNoExit(args)
+Exit.exit(res)
+  }
+
+  def mainNoExit(args: Array[String]): Int = {
+val parser = ArgumentParsers
+  .newArgumentParser("kafka-metadata-quorum")
+  .defaultHelp(true)
+  .description("This tool describes kraft metadata quorum status.")
+parser
+  .addArgument("--bootstrap-server")
+  .help("A comma-separated list of host:port pairs to use for establishing 
the connection to the Kafka cluster.")
+  .required(true)
+
+parser
+  .addArgument("--command-config")
+  .`type`(fileType())
+  .help("Property file containing configs to be passed to Admin Client.")
+val subparsers = parser.addSubparsers().dest("command")
+addDescribeParser(subparsers)
+
+var admin: Admin = null
+try {
+  val namespace = parser.parseArgsOrFail(args)
+  val command = namespace.getString("command")
+
+  val commandConfig = namespace.get[File]("command_config")
+  val props = if (commandConfig != null) {
+if (!commandConfig.exists()) {
+  throw new TerseFailure(s"Properties file ${commandConfig.getPath} 
does not exists!")
+}
+Utils.loadProps(commandConfig.getPath)
+  } else {
+new Properties()
+  }
+  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
namespace.getString("bootstrap_server"))
+  admin = Admin.create(props)
+
+  if (command == "describe") {
+if (namespace.getBoolean("status") && 
namespace.getBoolean("replication")) {
+  throw new TerseFailure(s"Only one of --status or --replication 
should be specified with describe sub-command")
+} else if (namespace.getBoolean("replication")) {
+  handleDescribeReplication(admin)
+} else if (namespace.getBoolean("status")) {
+  handleDescribeStatus(admin)
+} else {
+  throw new TerseFailure(s"One of --status or --replication must be 
specified with describe sub-command")
+}
+  } else {
+// currently we only support describe
+  }
+  0
+} catch {
+  case e: TerseFailure =>
+Console.err.println(e.getMessage)
+1
+} finally {
+  if (admin != null) {
+admin.close()
+  }
+}
+  }
+
+  def addDescribeParser(subparsers: Subparsers): Unit = {
+val describeParser = subparsers
+  .addParser("describe")
+  .help("Describe the metadata quorum info")
+
+val statusArgs = describeParser.addArgumentGroup("Status")
+statusArgs
+  .addArgument("--status")
+  .help(
+"A short summary of the quorum status and the other provides detailed 
information about the status of replication.")
+  .action(storeTrue())
+val replicationArgs = describeParser.addArgumentGroup("Replication")
+replicationArgs
+  .addArgument("--replication")
+  .help("Detailed information about the status of replication")
+  .action(storeTrue())
+  }
+
+  private def handleDescribeReplication(admin: Admin): Unit = {
+val quorumInfo = admin.describeMetadataQuorum().quorumInfo().get()
+val leaderId = quorumInfo.leaderId()
+val leader = quorumInfo.voters().asScala.filter(_.replicaId() == 
leaderId).head
+// Find proper columns width
+var (maxReplicaIdLen, maxLogEndOffsetLen, maxLagLen, 
maxLastFetchTimeMsLen, 

[GitHub] [kafka] C0urante commented on pull request #12410: MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest classes

2022-08-15 Thread GitBox


C0urante commented on PR #12410:
URL: https://github.com/apache/kafka/pull/12410#issuecomment-1215679995

   The `ThreadedTest` class was intended to detect uncaught errors on other 
threads than the main thread used for the test. In many cases (including the 
`WorkerSinkTaskThreadedTest` suite), this was unnecessary since tests that 
inherited from it were all single-threaded. The only exceptions are:
   - With the `ExactlyOnceWorkerSourceTaskTest` and `WorkerSourceTaskTest` 
suites, we were spinning up at most one additional thread per test via 
`ExecutorService::submit`, and every time we did that, we were already making 
sure to invoke `get` on the resulting `Future` on the main testing thread, 
which caused any uncaught errors to fail the test
   - With the `DistributedHerderTest` suite, we were also spinning up at most 
one additional thread per test via `ExecutorService::submit`, but not checking 
the resulting `Future`. I've added a check in this PR
   
   I toyed with the idea of keeping the `ThreadedTest` class as a reusable 
utility, but I don't think it'd save us a lot of trouble without also 
potentially tying our hands and preventing us from doing things like ensuring 
that a certain thread or task dispatched on a separate thread has completed at 
a certain point in the test.
   
   The `ShutdownableThread` (and its test) are not used (effectively) anywhere 
in the code base and are completely safe to remove.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14165) ConfigTransformer.DEFAULT_PATTERN fails on Windows platform

2022-08-15 Thread Gabor Andras (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Andras updated KAFKA-14165:
-
Description: 
When a configuration Map which has an entry with a key "someKey" and a value 
"${file:/F:/properties.txt:fileKey}" is passed to the 
ConfigTransformer.transform(Map) method, an exception is thrown:

2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] 
(main) Could not read properties from file /F: 
java.nio.file.NoSuchFileException: \F
    at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
    at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
    at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
    at 
java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:236)
    at java.base/java.nio.file.Files.newByteChannel(Files.java:380)
    at java.base/java.nio.file.Files.newByteChannel(Files.java:432)
    at 
java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422)
    at java.base/java.nio.file.Files.newInputStream(Files.java:160)
    at java.base/java.nio.file.Files.newBufferedReader(Files.java:2922)
    at java.base/java.nio.file.Files.newBufferedReader(Files.java:2955)
    at 
org.apache.kafka.common.config.provider.FileConfigProvider.reader(FileConfigProvider.java:104)
    at 
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:86)
    at 
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)

This happens because of a colon ":" in the file name, which is wrongfully 
matched by the ConfigTransformer.DEFAULT_PATTERN in these three groups "file", 
"/F" and "/properties.txt:fileKey" instead of "file", "/F:/properties.txt" and 
"fileKey".

The ConfigTransformer.DEFAULT_PATTERN should be changed to match the first and 
the last colon ":" in the expression.

  was:
When a configuration Map which has an entry with a key "someKey" and a value 
"${file:/F:/properties.txt:fileKey}" is passed to the 
ConfigTransformer.transform(Map) method, an exception is thrown:

2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] 
(main) Could not read properties from file /F: 
java.nio.file.NoSuchFileException: \F
    at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
    at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
    at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
    at 
java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:236)
    at java.base/java.nio.file.Files.newByteChannel(Files.java:380)
    at java.base/java.nio.file.Files.newByteChannel(Files.java:432)
    at 
java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422)
    at java.base/java.nio.file.Files.newInputStream(Files.java:160)
    at java.base/java.nio.file.Files.newBufferedReader(Files.java:2922)
    at java.base/java.nio.file.Files.newBufferedReader(Files.java:2955)
    at 
org.apache.kafka.common.config.provider.FileConfigProvider.reader(FileConfigProvider.java:104)
    at 
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:86)
    at 
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)

This happens because of an colon ":" in the file name, which is wrongfully 
matched by the ConfigTransformer.DEFAULT_PATTERN in these three groups "file", 
"/F" and "/properties.txt:fileKey" instead of "file", "/F:/properties.txt" and 
"fileKey".

The ConfigTransformer.DEFAULT_PATTERN should be changed to match the first and 
the last colon ":" in the expression.


> ConfigTransformer.DEFAULT_PATTERN fails on Windows platform
> ---
>
> Key: KAFKA-14165
> URL: https://issues.apache.org/jira/browse/KAFKA-14165
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.2.1
> Environment: Windows 10 OS
>Reporter: Gabor Andras
>Assignee: Prem Kamal
>Priority: Major
>
> When a configuration Map which has an entry with a key "someKey" and a value 
> "${file:/F:/properties.txt:fileKey}" is passed to the 
> ConfigTransformer.transform(Map) method, an exception is thrown:
> 2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] 
> (main) Could not read properties from file /F: 
> java.nio.file.NoSuchFileException: \F
>     at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
>     at 
> 

[GitHub] [kafka] C0urante merged pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-15 Thread GitBox


C0urante merged PR #12450:
URL: https://github.com/apache/kafka/pull/12450


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vincent81jiang opened a new pull request, #12516: Validate offsets of input records before writing them file

2022-08-15 Thread GitBox


vincent81jiang opened a new pull request, #12516:
URL: https://github.com/apache/kafka/pull/12516

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14154) Persistent URP after controller soft failure

2022-08-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14154.
-
Resolution: Fixed

> Persistent URP after controller soft failure
> 
>
> Key: KAFKA-14154
> URL: https://issues.apache.org/jira/browse/KAFKA-14154
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> We ran into a scenario where a partition leader was unable to expand the ISR 
> after a soft controller failover. Here is what happened:
> Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as 
> the current controller.
> 1. Broker 1 loses its session in Zookeeper.  
> 2. Broker 2 becomes the new controller.
> 3. During initialization, controller 2 removes 1 from the ISR. So state is 
> updated: leader=2, isr=[2], leader epoch=11.
> 4. Broker 2 receives `LeaderAndIsr` from the new controller with leader 
> epoch=11.
> 5. Broker 2 immediately tries to add replica 1 back to the ISR since it is 
> still fetching and is caught up. However, the 
> `BrokerToControllerChannelManager` is still pointed at controller 1, so that 
> is where the `AlterPartition` is sent.
> 6. Controller 1 does not yet realize that it is not the controller, so it 
> processes the `AlterPartition` request. It sees the leader epoch of 11, which 
> is higher than what it has in its own context. Following changes to the 
> `AlterPartition` validation in 
> [https://github.com/apache/kafka/pull/12032/files,] the controller returns 
> FENCED_LEADER_EPOCH.
> 7. After receiving the FENCED_LEADER_EPOCH from the old controller, the 
> leader is stuck because it assumes that the error implies that another 
> LeaderAndIsr request should be sent.
> Prior to 
> [https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,],
>  the way we handled this case was a little different. We only verified that 
> the leader epoch in the request was at least as large as the current epoch in 
> the controller context. Anything higher was accepted. The controller would 
> have attempted to write the updated state to Zookeeper. This update would 
> have failed because of the controller epoch check, however, we would have 
> returned NOT_CONTROLLER in this case, which is handled in 
> `AlterPartitionManager`.
> It is tempting to revert the logic, but the risk is in the idempotency check: 
> [https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.]
>  If the AlterPartition request happened to match the state inside the old 
> controller, the controller would consider the update successful and return no 
> error. But if its state was already stale at that point, then that might 
> cause the leader to incorrectly assume that the state had been updated.
> One way to fix this problem without weakening the validation is to rely on 
> the controller epoch in `AlterPartitionManager`. When we discover a new 
> controller, we also discover its epoch, so we can pass that through. The 
> `LeaderAndIsr` request already includes the controller epoch of the 
> controller that sent it and we already propagate this through to 
> `AlterPartition.submit`. Hence all we need to do is verify that the epoch of 
> the current controller target is at least as large as the one discovered 
> through the `LeaderAndIsr`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #12459: KAFKA-13036: Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest

2022-08-15 Thread GitBox


clolov commented on PR #12459:
URL: https://github.com/apache/kafka/pull/12459#issuecomment-1215593952

   Heya @cadonna and @dplavcic, Divij is taking some time off, so I will aim to 
provide an update to the comments and the pull request in the next couple of 
days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12514: KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epochs are ahead

2022-08-15 Thread GitBox


hachikuji merged PR #12514:
URL: https://github.com/apache/kafka/pull/12514


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12492: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-08-15 Thread GitBox


clolov commented on PR #12492:
URL: https://github.com/apache/kafka/pull/12492#issuecomment-1215577425

   Thank you @cadonna and @dplavcic for your comments. I will aim to address 
them as soon as possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12505: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-08-15 Thread GitBox


clolov commented on PR #12505:
URL: https://github.com/apache/kafka/pull/12505#issuecomment-1215575745

   Hey @dplavcic, thank you for the comments. I will aim to provide an answer 
in the next couple of days!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #12510: MINOR: Add setting input partitions for task mocks

2022-08-15 Thread GitBox


ableegoldman merged PR #12510:
URL: https://github.com/apache/kafka/pull/12510


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #12503: MINOR: Appending value to LIST config should not generate empty string with …

2022-08-15 Thread GitBox


chia7712 merged PR #12503:
URL: https://github.com/apache/kafka/pull/12503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #12515: MINOR: Improve readability of `tryProcessAlterPartition`

2022-08-15 Thread GitBox


ijuma opened a new pull request, #12515:
URL: https://github.com/apache/kafka/pull/12515

   After 520f72995d, the subsequent checks are ensuring that
   the leader and partition epochs are not less than. So,
   make that explicit.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14166) Consistent toString implementations for byte arrays in generated messages

2022-08-15 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14166:
---

 Summary: Consistent toString implementations for byte arrays in 
generated messages
 Key: KAFKA-14166
 URL: https://issues.apache.org/jira/browse/KAFKA-14166
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


In the generated `toString()` implementations for message objects (such as 
protocol RPCs), we are a little inconsistent in how we display types with raw 
bytes. If the type is `Array[Byte]`, then we use Arrays.toString. If the type 
is `ByteBuffer` (i.e. when `zeroCopy` is set), then we use the corresponding 
`ByteBuffer.toString`, which is not often useful. We should try to be 
consistent. By default, it is probably not useful to print the full array 
contents, but we might print summary information (e.g. size, checksum).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji opened a new pull request, #12514: KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epochs are ahead

2022-08-15 Thread GitBox


hachikuji opened a new pull request, #12514:
URL: https://github.com/apache/kafka/pull/12514

   Similar to https://github.com/apache/kafka/pull/12506. For the Kraft 
controller, we should return NOT_CONTROLLER if the leader/partition epoch in 
the request is ahead of the controller. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14165) ConfigTransformer.DEFAULT_PATTERN fails on Windows platform

2022-08-15 Thread Prem Kamal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prem Kamal reassigned KAFKA-14165:
--

Assignee: Prem Kamal

> ConfigTransformer.DEFAULT_PATTERN fails on Windows platform
> ---
>
> Key: KAFKA-14165
> URL: https://issues.apache.org/jira/browse/KAFKA-14165
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.2.1
> Environment: Windows 10 OS
>Reporter: Gabor Andras
>Assignee: Prem Kamal
>Priority: Major
>
> When a configuration Map which has an entry with a key "someKey" and a value 
> "${file:/F:/properties.txt:fileKey}" is passed to the 
> ConfigTransformer.transform(Map) method, an exception is thrown:
> 2022-08-12 16:44:21,553 ERROR [org.apa.kaf.com.con.pro.FileConfigProvider] 
> (main) Could not read properties from file /F: 
> java.nio.file.NoSuchFileException: \F
>     at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
>     at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>     at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
>     at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:236)
>     at java.base/java.nio.file.Files.newByteChannel(Files.java:380)
>     at java.base/java.nio.file.Files.newByteChannel(Files.java:432)
>     at 
> java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422)
>     at java.base/java.nio.file.Files.newInputStream(Files.java:160)
>     at java.base/java.nio.file.Files.newBufferedReader(Files.java:2922)
>     at java.base/java.nio.file.Files.newBufferedReader(Files.java:2955)
>     at 
> org.apache.kafka.common.config.provider.FileConfigProvider.reader(FileConfigProvider.java:104)
>     at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:86)
>     at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
> This happens because of an colon ":" in the file name, which is wrongfully 
> matched by the ConfigTransformer.DEFAULT_PATTERN in these three groups 
> "file", "/F" and "/properties.txt:fileKey" instead of "file", 
> "/F:/properties.txt" and "fileKey".
> The ConfigTransformer.DEFAULT_PATTERN should be changed to match the first 
> and the last colon ":" in the expression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14051) KRaft remote controllers do not create metrics reporters

2022-08-15 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-14051.
---
Resolution: Fixed

> KRaft remote controllers do not create metrics reporters
> 
>
> Key: KAFKA-14051
> URL: https://issues.apache.org/jira/browse/KAFKA-14051
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>
> KRaft remote controllers (KRaft nodes with the configuration value 
> process.roles=controller) do not create the configured metrics reporters 
> defined by the configuration key metric.reporters.  The reason is because 
> KRaft remote controllers are not wired up for dynamic config changes, and the 
> creation of the configured metric reporters actually happens during the 
> wiring up of the broker for dynamic reconfiguration, in the invocation of 
> DynamicBrokerConfig.addReconfigurables(KafkaBroker).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13559) The broker's ProduceResponse may be delayed for 300ms

2022-08-15 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-13559.

Fix Version/s: 3.4.0
 Reviewer: Rajini Sivaram
   Resolution: Fixed

> The broker's  ProduceResponse may be delayed for 300ms
> --
>
> Key: KAFKA-13559
> URL: https://issues.apache.org/jira/browse/KAFKA-13559
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.7.0
>Reporter: frankshi
>Assignee: Badai Aqrandista
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: image-1.png, image-2.png, 
> image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png, 
> image-3.png, image-5.png, image-6.png, image-7.png, image.png
>
>
> Hi team:
> We have found the value in the source code 
> [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
>  may lead broker’s  ProduceResponse to be delayed for 300ms.
>  * Server-version: 2.13-2.7.0.
>  * Client-version: confluent-kafka-python-1.5.0.
> we have set the client’s  configuration as following:
> {code:java}
> ling.ms = 0
> acks = 1
> delivery.timeout.ms = 100
> request.timeout.ms =  80
> Sasl.mechanism =  “PLAIN”
> Security.protocol  =  “SASL_SSL”
> ..
> {code}
> Because we set ACKs = 1, the client sends ProduceRequests and receives 
> ProduceResponses from brokers. The leader broker doesn't need to wait for the 
> ISR’s writing data to disk successfully.  It can reply to the client by 
> sending ProduceResponses directly. In our situation, the ping value between 
> the client and the kafka brokers is about ~10ms, and most of the time, the 
> responses are received about 10ms after the Produce requests are sent. But 
> sometimes the responses are received about ~300ms later.
> The following shows the log from the client.
> {code:java}
> 2021-11-26 02:31:30,567  Sent partial ProduceRequest (v7, 0+16527/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent partial ProduceRequest (v7, 16527+16384/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId 
> 2753)
> 2021-11-26 02:31:30,570  Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
> 2021-11-26 02:31:30,571  Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
> 2021-11-26 02:31:30,572  Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2751, 
> rtt 9.79ms)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2752, 
> rtt 10.34ms)
> 2021-11-26 02:31:30,573  Received ProduceResponse (v7, 69 bytes, CorrId 2753, 
> rtt 10.11ms)
> 2021-11-26 02:31:30,872  Received ProduceResponse (v7, 69 bytes, CorrId 2754, 
> rtt 309.69ms)
> 2021-11-26 02:31:30,883  Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
> 2021-11-26 02:31:30,887  Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
> 2021-11-26 02:31:30,888  Received ProduceResponse (v7, 69 bytes, CorrId 2755, 
> rtt 318.85ms)
> 2021-11-26 02:31:30,893  Sent partial ProduceRequest (v7, 0+16527/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,894  Sent partial ProduceRequest (v7, 16527+16384/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,895  Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId 
> 2759)
> 2021-11-26 02:31:30,896  Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2756, 
> rtt 317.74ms)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2757, 
> rtt 4.22ms)
> 2021-11-26 02:31:30,899  Received ProduceResponse (v7, 69 bytes, CorrId 2758, 
> rtt 2.61ms){code}
>  
> The requests of CorrId 2753 and 2754 are almost sent at the same time, but 
> the Response of 2754 is delayed for ~300ms. 
> We checked the logs on the broker.
>  
> {code:java}
> [2021-11-26 02:31:30,873] DEBUG Completed 
> request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=rdkafka, 
> correlationId=2754) – {acks=1,timeout=80,numPartitions=1},response: 
> {responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]}
> ],throttle_time_ms=0} from connection 
> 10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python,
>  softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger)
> {code}
>  
>  
> It seems that the time cost on the server side is very small. What’s the 
> reason for the latency spikes?
> We also did tcpdump  at the server side and 

[GitHub] [kafka] rajinisivaram merged pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-08-15 Thread GitBox


rajinisivaram merged PR #12416:
URL: https://github.com/apache/kafka/pull/12416


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-08-15 Thread GitBox


rajinisivaram commented on PR #12416:
URL: https://github.com/apache/kafka/pull/12416#issuecomment-1214907290

   Test failures not related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13887) Running multiple instance of same stateful KafkaStreams application on single host raise Exception

2022-08-15 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579602#comment-17579602
 ] 

Bruno Cadonna commented on KAFKA-13887:
---

bq. Meaning: no local store, no stateful processing that is backed on disk. Is 
this true?

No, you can have an attached storage volume. In k8s you need to use stateful 
sets to assign the the same storage volume to the same pod across restarts. 

bq. The problem is state.dir gets associated more or less randomly to the 
application instance. But what has to be ensured is the partition assignment 
for the given application instance fits to the state. This mean we cannot 
randomly link an application instance with its current partition assignment to 
a random state store...

The state store directory needs to be there before the assignment. Streams is 
not designed to allow to change the state store directory on the fly.

bq. if I got it correct, the sate gets transferred from instance to instance in 
case of partition reassignment. But in this setup I can't imagine this can 
work Any input on that?

The state is transferred through the changelog topics on the Kafka brokers. The 
setup of the nodes do not matter, except that there needs to be some sort of 
persistent volume to write to per Streams instance if you use persistent state 
store. If you use in-memory state stores you do not need the a persistent 
volume.  

> Running multiple instance of same stateful KafkaStreams application on single 
> host raise Exception
> --
>
> Key: KAFKA-13887
> URL: https://issues.apache.org/jira/browse/KAFKA-13887
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sina Askarnejad
>Priority: Minor
>
> KAFKA-10716 locks the state store directory on the running host, as it stores 
> the processId in a *kafka-streams-process-metadata* file in this path. As a 
> result to run multiple instances of the same application on a single host 
> each instance must run with different *state.dir* config, otherwise the 
> following exception will be raised for the second instance:
>  
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> Unable to initialize state, this can happen if multiple instances of Kafka 
> Streams are running in the same state directory
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:191)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:868)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:851)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:821)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:733)
>  
> The easiest solution multi-threading. Running single instance with multiple 
> threads, but the multi-threading programming is not suitable for all 
> scenarios. e.g., when the tasks are CPU intensive, or in large scale 
> scenarios, or fully utilizing multi core CPUS.
>  
> The second solution is multi-processing. This solution on a single host needs 
> extra work and advisor, as each instance needs to be run with different 
> {*}state.dir{*}. It is a good enhancement if kafkaStreams could handle this 
> config for multi instance.
>  
> The proposed solution is that the KafkaStreams use the 
> */\{state.dir}/\{application.id}/\{ordinal.number}* path instead of 
> */\{state.dir}/\{application.id}* to store the meta file and states. The 
> *ordinal.number* starts with 0 and is incremental.
> When an instance starts it checks the ordinal.number directories start by 0 
> and finds the first subdirectory that is not locked and use that for its 
> state directory, this way all the tasks assigns correctly on rebalance and 
> multiple instance can be run on single host.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12459: KAFKA-13036: Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest

2022-08-15 Thread GitBox


cadonna commented on code in PR #12459:
URL: https://github.com/apache/kafka/pull/12459#discussion_r945525699


##
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java:
##
@@ -279,449 +310,335 @@ public void 
shouldThrowIfDbToAddWasAlreadyAddedForOtherSegment() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 
 final Throwable exception = assertThrows(
-IllegalStateException.class,
-() -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, 
cacheToAdd2, statisticsToAdd2)
+IllegalStateException.class,
+() -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, 
dbToAdd1, cacheToAdd2, statisticsToAdd2)
 );
 assertThat(
-exception.getMessage(),
-is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + 
TASK_ID1 +
-" was already added for another segment as a value provider. 
This is a bug in Kafka Streams. " +
-"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;)
+exception.getMessage(),
+is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task 
" + TASK_ID1 +
+" was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+"Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues;)
 );
 }
 
 @Test
 public void 
shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedToNewlyCreatedRecorder()
 {
-recordingTrigger.addMetricsRecorder(recorder);
-replay(recordingTrigger);
-
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 
-verify(recordingTrigger);
+verify(recordingTrigger).addMetricsRecorder(recorder);
 }
 
 @Test
 public void 
shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfterLastValueProvidersWereRemoved()
 {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-reset(recordingTrigger);
-recordingTrigger.addMetricsRecorder(recorder);
-replay(recordingTrigger);
+
+Mockito.reset(recordingTrigger);
 
 recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
 
-verify(recordingTrigger);
+verify(recordingTrigger).addMetricsRecorder(recorder);
 }
 
 @Test
 public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
-reset(recordingTrigger);
-replay(recordingTrigger);
+
+verify(recordingTrigger).addMetricsRecorder(recorder);
 
 recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
 
-verify(recordingTrigger);
+verifyNoMoreInteractions(recordingTrigger);
 }
 
 @Test
 public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
-reset(statisticsToAdd1);
-statisticsToAdd1.close();
-replay(statisticsToAdd1);
-
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-verify(statisticsToAdd1);
+verify(statisticsToAdd1).close();
 }
 
 @Test
 public void 
shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, null);
-reset(statisticsToAdd1);
-replay(statisticsToAdd1);
-
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-verify(statisticsToAdd1);
+verify(statisticsToAdd1, never()).close();
 }
 
 @Test
 public void 
shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-reset(recordingTrigger);
-replay(recordingTrigger);
+
+Mockito.reset(recordingTrigger);
 
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
 
-verify(recordingTrigger);
+verify(recordingTrigger, never()).removeMetricsRecorder(recorder);
 
-reset(recordingTrigger);
-recordingTrigger.removeMetricsRecorder(recorder);
-replay(recordingTrigger);
+Mockito.reset(recordingTrigger);
 
 recorder.removeValueProviders(SEGMENT_STORE_NAME_2);
 
-verify(recordingTrigger);
+verify(recordingTrigger).removeMetricsRecorder(recorder);