[GitHub] [kafka] yashmayya commented on a diff in pull request #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode

2023-03-21 Thread via GitHub


yashmayya commented on code in PR #13375:
URL: https://github.com/apache/kafka/pull/13375#discussion_r1144234357


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -152,11 +154,6 @@ public void stop() {
 connectCluster.forEach(this::stopWorker);
 try {
 kafkaCluster.stop();
-} catch (UngracefulShutdownException e) {
-log.warn("Kafka did not shutdown gracefully");

Review Comment:
   It's being handled here - 
https://github.com/yashmayya/kafka/blob/f0f4bb9a30ed3fcb4692f16185df0dce4a032efd/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L175-L179



##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -174,13 +174,17 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 props.put(KafkaConfig$.MODULE$.LogDirsProp(),
 String.join(",", brokerNode.logDataDirectories()));
 }
-props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
-props.put(KafkaConfig$.MODULE$.ListenersProp(), 
listeners(node.id()));
-props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
-nodes.interBrokerListenerName().value());
-props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
-"CONTROLLER");
+
+// listeners could be defined via Builder::setConfigProp which 
shouldn't be overridden
+if (!props.containsKey(KafkaConfig$.MODULE$.ListenersProp())) {

Review Comment:
   Hm that's a good point and I did ponder over this one as well. The 
alternative would be to parse the user defined listener here and make sure that 
either the security protocol map, inter broker listener, controller listener 
are all set appropriately or else do so ourselves - this would involve making 
sure that the protocols, SASL mechanisms etc. all match up properly which seems 
potentially error prone (I tried a setup with broker listeners and controller 
listeners using different security protocols or mechanisms and I wasn't able to 
get it to work - not sure whether something like that is even currently 
supported). I'm assuming that users would likely want to set custom listeners 
in case they want non-plaintext listeners at which point it isn't much extra 
work to also setup the controller listener accordingly (and there's multiple 
examples on how to do so in various integration tests). What do you think?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -195,7 +197,22 @@ public void testRestartFailedTask() throws Exception {
 public void testBrokerCoordinator() throws Exception {
 ConnectorHandle connectorHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
 
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, 
String.valueOf(5000));
-connect = connectBuilder.workerProps(workerProps).build();
+Properties brokerProps = new Properties();
+
+// Find a free port and use it in the Kafka broker's listeners config. 
We can't use port 0 in the listeners
+// config to get a random free port because in this test we want to 
stop the Kafka broker and then bring it
+// back up and listening on the same port in order to verify that the 
Connect cluster can re-connect to Kafka
+// and continue functioning normally. If we were to use port 0 here, 
the Kafka broker would most likely listen
+// on a different random free port the second time it is started.

Review Comment:
   Sorry, my bad for not clarifying at the outset. There's a couple of reasons 
- one is that the previous implementation seemed fairly messy where we were 
checking the bound ports after broker startup and then modifying the broker 
config and using the static port in the listener configuration on broker 
restarts only if the user hadn't already configured listener configs. So, for 
instance, this `startOnlyKafkaOnSamePorts` would not work as expected if the 
user defines a SASL or SSL listener with port 0. The other reason is that the 
`KafkaClusterTestKit` being leveraged here would need some refactors to allow 
changing broker configs after it has already been instantiated - currently, it 
only supports customising broker configs in its builder. All in all, it just 
seems a lot cleaner to move the responsibility of using a fixed port in the 
listeners configuration to the clients of the embedded Kafka cluster in case 
they want to test functionality involving offline brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] hudeqi commented on pull request #13427: MINOR:Incorrect/canonical use of constants in AdminClientConfig and StreamsConfigTest

2023-03-21 Thread via GitHub


hudeqi commented on PR #13427:
URL: https://github.com/apache/kafka/pull/13427#issuecomment-1478846920

   Could you help to review? @dengziming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-03-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-14824:
---
Summary: ReplicaAlterLogDirsThread may cause serious disk growing in case 
of potential exception  (was: ReplicaAlterLogDirsThread may cause serious disk 
growing in case of unknown exception)

> ReplicaAlterLogDirsThread may cause serious disk growing in case of potential 
> exception
> ---
>
> Key: KAFKA-14824
> URL: https://issues.apache.org/jira/browse/KAFKA-14824
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Priority: Blocker
> Attachments: 1.png, 2.png, 3.png, 4.png
>
>
> For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
> unknown exception and the partition fetch is suspended, the paused cleanup 
> logic of the partition needs to be canceled, otherwise it will lead to 
> serious unexpected disk usage growth.
>  
> For example, in the actual production environment (the Kafka version used is 
> 2.5.1), there is such a case: perform log dir balance on this partition 
> leader broker. After started fetching when the future log is successfully 
> created, then reset and truncate to the leader's log start offset for the 
> first time due to out of range. At the same time, because the partition 
> leader is processing the leaderAndIsrRequest, the leader epoch is updated, so 
> the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
> 'partitionStates' of the partition are cleaned up. At the same time, the 
> logic of add ReplicaAlterLogDirsThread for the partition is executing in the 
> thread that is processing leaderAndIsrRequest. In here, the offset set by 
> InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
> performs the logic of processFetchRequest, it will throw 
> "java.lang.IllegalStateException : Offset mismatch for the future replica 
> anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, 
> log end offset = 4918576434.", leading to such a result: 
> ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous 
> paused cleanup logic of the partition, the disk usage of the corresponding 
> broker increases infinitely, causing serious problems.
>  
> But I found that trunk fixed this bug in KAFKA-9087, which may cause 
> ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
> fetch. But I don't know if there will be some other unknown exceptions, and 
> at the same time, due to the current logic, it will bring the same disk 
> cleanup failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-03-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi reassigned KAFKA-14824:
--

Assignee: hudeqi

> ReplicaAlterLogDirsThread may cause serious disk growing in case of potential 
> exception
> ---
>
> Key: KAFKA-14824
> URL: https://issues.apache.org/jira/browse/KAFKA-14824
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Blocker
> Attachments: 1.png, 2.png, 3.png, 4.png
>
>
> For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
> unknown exception and the partition fetch is suspended, the paused cleanup 
> logic of the partition needs to be canceled, otherwise it will lead to 
> serious unexpected disk usage growth.
>  
> For example, in the actual production environment (the Kafka version used is 
> 2.5.1), there is such a case: perform log dir balance on this partition 
> leader broker. After started fetching when the future log is successfully 
> created, then reset and truncate to the leader's log start offset for the 
> first time due to out of range. At the same time, because the partition 
> leader is processing the leaderAndIsrRequest, the leader epoch is updated, so 
> the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
> 'partitionStates' of the partition are cleaned up. At the same time, the 
> logic of add ReplicaAlterLogDirsThread for the partition is executing in the 
> thread that is processing leaderAndIsrRequest. In here, the offset set by 
> InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
> performs the logic of processFetchRequest, it will throw 
> "java.lang.IllegalStateException : Offset mismatch for the future replica 
> anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, 
> log end offset = 4918576434.", leading to such a result: 
> ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous 
> paused cleanup logic of the partition, the disk usage of the corresponding 
> broker increases infinitely, causing serious problems.
>  
> But I found that trunk fixed this bug in KAFKA-9087, which may cause 
> ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
> fetch. But I don't know if there will be some other unknown exceptions, and 
> at the same time, due to the current logic, it will bring the same disk 
> cleanup failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk growing in case of unknown exception

2023-03-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-14824:
---
Summary: ReplicaAlterLogDirsThread may cause serious disk growing in case 
of unknown exception  (was: ReplicaAlterLogDirsThread may cause serious disk 
usage in case of unknown exception)

> ReplicaAlterLogDirsThread may cause serious disk growing in case of unknown 
> exception
> -
>
> Key: KAFKA-14824
> URL: https://issues.apache.org/jira/browse/KAFKA-14824
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Priority: Blocker
> Attachments: 1.png, 2.png, 3.png, 4.png
>
>
> For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
> unknown exception and the partition fetch is suspended, the paused cleanup 
> logic of the partition needs to be canceled, otherwise it will lead to 
> serious unexpected disk usage growth.
>  
> For example, in the actual production environment (the Kafka version used is 
> 2.5.1), there is such a case: perform log dir balance on this partition 
> leader broker. After started fetching when the future log is successfully 
> created, then reset and truncate to the leader's log start offset for the 
> first time due to out of range. At the same time, because the partition 
> leader is processing the leaderAndIsrRequest, the leader epoch is updated, so 
> the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
> 'partitionStates' of the partition are cleaned up. At the same time, the 
> logic of add ReplicaAlterLogDirsThread for the partition is executing in the 
> thread that is processing leaderAndIsrRequest. In here, the offset set by 
> InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
> performs the logic of processFetchRequest, it will throw 
> "java.lang.IllegalStateException : Offset mismatch for the future replica 
> anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, 
> log end offset = 4918576434.", leading to such a result: 
> ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous 
> paused cleanup logic of the partition, the disk usage of the corresponding 
> broker increases infinitely, causing serious problems.
>  
> But I found that trunk fixed this bug in KAFKA-9087, which may cause 
> ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
> fetch. But I don't know if there will be some other unknown exceptions, and 
> at the same time, due to the current logic, it will bring the same disk 
> cleanup failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…

2023-03-21 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1478842301

   Hello, for "potential exceptions", I did an experiment to simulate a disk 
failure, which eventually lead to the unexpected disk growing. For details, 
please refer to the corresponding comment in 
[jira](https://issues.apache.org/jira/browse/KAFKA-14824) .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception

2023-03-21 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703449#comment-17703449
 ] 

hudeqi edited comment on KAFKA-14824 at 3/22/23 2:35 AM:
-

For "potential exceptions may be throw", I did such an experiment: IOException 
was artificially injected into "processPartitionData" to simulate that 
ReplicaAlterLogDirsThread encountered a disk failure during the fetch process, 
in which the future log is in /data2 and fetching from /data1 (normal data 
retention time is 1h).
Finally, I found the same serious impact as the previous "offset mismatch 
error" (KAFKA-9087) leads: when the exception thrown by "processPartitionData" 
was caught, because it was marked as failed, the log cleanup of the 
corresponding partition was not resumed, so compared to other disks, The log of 
/data1 will grow without limit. Therefore, I think the defensive measure of 
"resume the log cleanup of the source partition when failure" is necessary.

The screenshot of the experiment can be found in the attachment.


was (Author: hudeqi):
For "potential exceptions may be throw", I did such an experiment: IOException 
was artificially injected into "processPartitionData" to simulate that 
ReplicaAlterLogDirsThread encountered a disk failure during the fetch process, 
in which the future log is in /data2 and fetching from /data1 (normal data 
retention time is 1h).
Finally, I found the same serious impact as the previous "offset mismatch 
error" (KAFKA-9087) leads: when the exception thrown by "processPartitionData" 
was caught, because it was marked as failed, the log cleanup of the 
corresponding partition was not resumed, so compared to other disks, The log of 
/data1 will grow without limit. Therefore, I think the defensive measure of 
"resume the log cleanup of the source partition when failure" is necessary.

!1.png!

> ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown 
> exception
> ---
>
> Key: KAFKA-14824
> URL: https://issues.apache.org/jira/browse/KAFKA-14824
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Priority: Blocker
> Attachments: 1.png, 2.png, 3.png, 4.png
>
>
> For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
> unknown exception and the partition fetch is suspended, the paused cleanup 
> logic of the partition needs to be canceled, otherwise it will lead to 
> serious unexpected disk usage growth.
>  
> For example, in the actual production environment (the Kafka version used is 
> 2.5.1), there is such a case: perform log dir balance on this partition 
> leader broker. After started fetching when the future log is successfully 
> created, then reset and truncate to the leader's log start offset for the 
> first time due to out of range. At the same time, because the partition 
> leader is processing the leaderAndIsrRequest, the leader epoch is updated, so 
> the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
> 'partitionStates' of the partition are cleaned up. At the same time, the 
> logic of add ReplicaAlterLogDirsThread for the partition is executing in the 
> thread that is processing leaderAndIsrRequest. In here, the offset set by 
> InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
> performs the logic of processFetchRequest, it will throw 
> "java.lang.IllegalStateException : Offset mismatch for the future replica 
> anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, 
> log end offset = 4918576434.", leading to such a result: 
> ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous 
> paused cleanup logic of the partition, the disk usage of the corresponding 
> broker increases infinitely, causing serious problems.
>  
> But I found that trunk fixed this bug in KAFKA-9087, which may cause 
> ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
> fetch. But I don't know if there will be some other unknown exceptions, and 
> at the same time, due to the current logic, it will bring the same disk 
> cleanup failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception

2023-03-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-14824:
---
Attachment: 1.png
2.png
3.png
4.png

> ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown 
> exception
> ---
>
> Key: KAFKA-14824
> URL: https://issues.apache.org/jira/browse/KAFKA-14824
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Priority: Blocker
> Attachments: 1.png, 2.png, 3.png, 4.png
>
>
> For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
> unknown exception and the partition fetch is suspended, the paused cleanup 
> logic of the partition needs to be canceled, otherwise it will lead to 
> serious unexpected disk usage growth.
>  
> For example, in the actual production environment (the Kafka version used is 
> 2.5.1), there is such a case: perform log dir balance on this partition 
> leader broker. After started fetching when the future log is successfully 
> created, then reset and truncate to the leader's log start offset for the 
> first time due to out of range. At the same time, because the partition 
> leader is processing the leaderAndIsrRequest, the leader epoch is updated, so 
> the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
> 'partitionStates' of the partition are cleaned up. At the same time, the 
> logic of add ReplicaAlterLogDirsThread for the partition is executing in the 
> thread that is processing leaderAndIsrRequest. In here, the offset set by 
> InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
> performs the logic of processFetchRequest, it will throw 
> "java.lang.IllegalStateException : Offset mismatch for the future replica 
> anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, 
> log end offset = 4918576434.", leading to such a result: 
> ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous 
> paused cleanup logic of the partition, the disk usage of the corresponding 
> broker increases infinitely, causing serious problems.
>  
> But I found that trunk fixed this bug in KAFKA-9087, which may cause 
> ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
> fetch. But I don't know if there will be some other unknown exceptions, and 
> at the same time, due to the current logic, it will bring the same disk 
> cleanup failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception

2023-03-21 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703449#comment-17703449
 ] 

hudeqi commented on KAFKA-14824:


For "potential exceptions may be throw", I did such an experiment: IOException 
was artificially injected into "processPartitionData" to simulate that 
ReplicaAlterLogDirsThread encountered a disk failure during the fetch process, 
in which the future log is in /data2 and fetching from /data1 (normal data 
retention time is 1h).
Finally, I found the same serious impact as the previous "offset mismatch 
error" (KAFKA-9087) leads: when the exception thrown by "processPartitionData" 
was caught, because it was marked as failed, the log cleanup of the 
corresponding partition was not resumed, so compared to other disks, The log of 
/data1 will grow without limit. Therefore, I think the defensive measure of 
"resume the log cleanup of the source partition when failure" is necessary.

!1.png!

> ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown 
> exception
> ---
>
> Key: KAFKA-14824
> URL: https://issues.apache.org/jira/browse/KAFKA-14824
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Priority: Blocker
> Attachments: 1.png, 2.png, 3.png, 4.png
>
>
> For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
> unknown exception and the partition fetch is suspended, the paused cleanup 
> logic of the partition needs to be canceled, otherwise it will lead to 
> serious unexpected disk usage growth.
>  
> For example, in the actual production environment (the Kafka version used is 
> 2.5.1), there is such a case: perform log dir balance on this partition 
> leader broker. After started fetching when the future log is successfully 
> created, then reset and truncate to the leader's log start offset for the 
> first time due to out of range. At the same time, because the partition 
> leader is processing the leaderAndIsrRequest, the leader epoch is updated, so 
> the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
> 'partitionStates' of the partition are cleaned up. At the same time, the 
> logic of add ReplicaAlterLogDirsThread for the partition is executing in the 
> thread that is processing leaderAndIsrRequest. In here, the offset set by 
> InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
> performs the logic of processFetchRequest, it will throw 
> "java.lang.IllegalStateException : Offset mismatch for the future replica 
> anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, 
> log end offset = 4918576434.", leading to such a result: 
> ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous 
> paused cleanup logic of the partition, the disk usage of the corresponding 
> broker increases infinitely, causing serious problems.
>  
> But I found that trunk fixed this bug in KAFKA-9087, which may cause 
> ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
> fetch. But I don't know if there will be some other unknown exceptions, and 
> at the same time, due to the current logic, it will bring the same disk 
> cleanup failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-21 Thread via GitHub


kirktrue commented on PR #13425:
URL: https://github.com/apache/kafka/pull/13425#issuecomment-1478805465

   @guozhangwang @hachikuji @rajinisivaram @philipnee This is ready for a 
review from whomever has the time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-03-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339
 ] 

Greg Harris edited comment on KAFKA-14666 at 3/22/23 12:12 AM:
---

I proposed a tactical fix for this in 
[https://github.com/apache/kafka/pull/13429] which provides variable-precision 
translation between the most recent restart of MM2 and the end of the 
replicated topic, similar to the existing behavior pre KAFKA-12468.

This uses strategy (2) from above, but limited to syncs read during a single 
task lifetime to get monotonicity without re-reading the checkpoint topic.

A separate improvement can be considered to allow for translation of offsets 
prior to the latest restart of MM2, or increasing the precision of the 
translation with new configurations.


was (Author: gharris1727):
I proposed a tactical fix for this in 
[https://github.com/apache/kafka/pull/13429] which provides variable-precision 
translation between the most recent restart of MM2 and the end of the 
replicated topic, similar to the existing behavior pre KAFKA-12468.

This uses strategy (2) from above, but limited to syncs read during a single 
task lifetime to get monotonicity without re-reading the checkpoint topic.

A separate improvement can be considered to allow for translation of offsets 
prior to the latest restart of MM2, or increasing the accuracy of the 
translation with new configurations.

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax merged pull request #13409: KAFKA-14491: [18/N] Update versioned store to check latest value on timestamped get

2023-03-21 Thread via GitHub


mjsax merged PR #13409:
URL: https://github.com/apache/kafka/pull/13409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-21 Thread via GitHub


mjsax merged PR #13292:
URL: https://github.com/apache/kafka/pull/13292


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13430: MINOR; Increase log level of some rare events

2023-03-21 Thread via GitHub


jsancio merged PR #13430:
URL: https://github.com/apache/kafka/pull/13430


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-03-21 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703404#comment-17703404
 ] 

Guozhang Wang commented on KAFKA-13295:
---

[~sagarrao] I had a chat with others driving the 4.0 release and it seems like 
it will still take some time, given that I will resume helping you to finish 
fixing this bug and not wait for that release.

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-03-21 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13295:
--
Fix Version/s: (was: 4.0.0)

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] SpacRocket commented on a diff in pull request #13382: KAFKA-14722: Make BooleanSerde public

2023-03-21 Thread via GitHub


SpacRocket commented on code in PR #13382:
URL: https://github.com/apache/kafka/pull/13382#discussion_r1144052014


##
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##
@@ -48,42 +45,4 @@ public NullableValueAndTimestampSerde(final Serde 
valueSerde) {
 );
 }
 
-static final class BooleanSerde {

Review Comment:
   I think not, I've found following unit tests that are using 
NullableValueAndTimestampSerializer (currently the only code that depends on 
BooleanSerde):
   
   - NullableValueAndTimestampSerdeTest - These unit-tests check everything as 
a whole class. There is not a single unit-test that tests only BooleanSerde.
   - MeteredVersionedKeyValueStoreTest - Same as above.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1144002857


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+private class UnsentOffsetFetchRequest extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequest(final Set partitions,
+final GroupState.Generation generation,
+final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = new CompletableFuture<>();
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+
+public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+groupState.groupId,
+true,
+new ArrayList<>(this.requestedPartitions),
+throwOnFetchStableOffsetUnsupported);
+NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+builder,
+coordinatorRequestManager.coordinator());
+unsentRequest.future().whenComplete((r, t) -> {
+onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+});
+return unsentRequest;
+}
+
+public void onResponse(
+final long currentTimeMs,
+final OffsetFetchResponse response) {
+Errors responseError = 
response.groupLevelError(groupState.groupId);
+if (responseError != Errors.NONE) {
+onFailure(currentTimeMs, responseError);
+return;
+}
+onSuccess(currentTimeMs, response);
+}
+
+private void onFailure(final long currentTimeMs,
+   final Errors responseError) {
+log.debug("Offset fetch failed: {}", responseError.message());
+
+// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
+// re-discover the coordinator and retry
+
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+retry(currentTimeMs);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+} else {
+future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+}
+}
+
+private void retry(final long currentTimeMs) {
+onFailedAttempt(currentTimeMs);
+onSendAttempt(currentTimeMs);
+pendingRequests.addOffsetFetchRequest(this);
+}
+
+private void onSuccess(final long currentTimeMs,
+   final OffsetFetchResponse response) {
+Set unauthorizedTopics = null;
+Map 
responseData =
+response.partitionDataMap(groupState.groupId);
+Map offsets = new 
HashMap<>(responseData.size());
+Set unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+for (Map.Entry 
entry : responseData.entrySet()) {
+TopicPartition tp = entry.getKey();
+OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+if (partitionData.hasError()) {
+Errors error = partitionData.error;
+log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+"not " +
+"exist"));
+return;
+} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+if (unauthorizedTopics == null) {
+unauthorizedTopics

[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143999124


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -137,31 +166,21 @@ CompletableFuture sendAutoCommit(final 
Map t.ack(currentTimeMs));
 }
 
-// Visible for testing
-Queue stagedCommits() {
-return this.stagedCommits;
-}
 
-private class StagedCommit {
+private class UnsentOffsetCommitRequest {
 private final Map offsets;
 private final String groupId;
 private final GroupState.Generation generation;
 private final String groupInstanceId;
 private final NetworkClientDelegate.FutureCompletionHandler future;
 
-public StagedCommit(final Map 
offsets,
-final String groupId,
-final String groupInstanceId,
-final GroupState.Generation generation) {
+public UnsentOffsetCommitRequest(final Map offsets,
+ final String groupId,
+ final String groupInstanceId,
+ final GroupState.Generation 
generation) {
 this.offsets = offsets;
-// if no callback is provided, DefaultOffsetCommitCallback will be 
used.

Review Comment:
   I'll double check if the defaultOffsetCommitCallback is being invoked when 
user provides a null callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-12694) Specifying a struct-based defaultValue on a SchemaBuilder causes a DataException

2023-03-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-12694:
-

Assignee: Chris Egerton

> Specifying a struct-based defaultValue on a SchemaBuilder causes a 
> DataException
> 
>
> Key: KAFKA-12694
> URL: https://issues.apache.org/jira/browse/KAFKA-12694
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
>Reporter: Chris Cranford
>Assignee: Chris Egerton
>Priority: Major
>
> When making a call to {{SchemaBuilder#defaultValue(Object)}} a 
> {{DataException}} will be thrown if the value is of type {{STRUCT}} due to a 
> schema mismatch.  This is because the method passes a reference of {{this}}, 
> referencing a {{SchemaBuilder}} in the {{ConnectSchema#validate}} method 
> which later relies on a {{ConnectSchema#equals}} equality check which expects 
> the passed schema to be an actual {{ConnectSchema}} rather than a 
> {{SchemaBuilder}} object that implements {{Schema}}.
> {code}
> Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid 
> default value
>   at 
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
>   at 
> io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
>   at 
> io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
>   at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>   at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>   at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>   at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>   at 
> io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
>   at 
> io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
>   at 
> io.debezium.connector.oracle.OracleDatabaseSchema.applySchemaChange(OracleDatabaseSchema.java:70)
>   at 
> io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:460)
>   at 
> io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$2(RelationalSnapshotChangeEventSource.java:273)
>   ... 10 more
> Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do 
> not match.
>   at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:252)
>   at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
>   at 
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
>   ... 26 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-3910) Cyclic schema support in ConnectSchema and SchemaBuilder

2023-03-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-3910:


Assignee: Chris Egerton  (was: Shikhar Bhushan)

> Cyclic schema support in ConnectSchema and SchemaBuilder
> 
>
> Key: KAFKA-3910
> URL: https://issues.apache.org/jira/browse/KAFKA-3910
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: John Hofman
>Assignee: Chris Egerton
>Priority: Major
>
> Cyclic schema's are not supported by ConnectSchema or SchemaBuilder. 
> Subsequently the AvroConverter (confluentinc/schema-registry) hits a stack 
> overflow when converting a cyclic avro schema, e.g:
> {code}
> {"type":"record", 
> "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}
> {code}
> This is a blocking issue for all connectors running on the connect framework 
> with data containing cyclic references. The AvroConverter cannot support 
> cyclic schema's until the underlying ConnectSchema and SchemaBuilder do.
> To reproduce the stack-overflow (Confluent-3.0.0):
> Produce some cyclic data:
> {code}
> bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test 
> --property value.schema='{"type":"record", 
> "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}'
> {"value":1,"next":null} 
> {"value":1,"next":{"list":{"value":2,"next":null}}}
> {code}
> Then try to consume it with connect:
> {code:title=connect-console-sink.properties}
> name=local-console-sink 
> connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector 
> tasks.max=1 
> topics=test
> {code}
> {code}
> ./bin/connect-standalone 
> ./etc/schema-registry/connect-avro-standalone.properties 
> connect-console-sink.properties  
> … start up logging … 
> java.lang.StackOverflowError 
>  at org.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54) 
>  at org.apache.avro.JsonProperties.getProp(JsonProperties.java:45) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2023-03-21 Thread via GitHub


C0urante commented on PR #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-1478578470

   Opened https://github.com/apache/kafka/pull/13433 to address this (and 
cyclic schema support)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13433: KAFKA-12694, KAFKA-3910: Add cyclic schema support, fix default struct values

2023-03-21 Thread via GitHub


C0urante commented on PR #13433:
URL: https://github.com/apache/kafka/pull/13433#issuecomment-1478577830

   cc @urbandan; I've tweaked this a bit (stopped tracking equivalent schemas 
while comparing default values since that was actually doing nothing and AFAICT 
is unnecessary, and copied the equivalent schemas map before making any 
modifications). LMK what you think!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #13433: KAFKA-12694, KAFKA-3910: Add cyclic schema support, fix default struct values

2023-03-21 Thread via GitHub


C0urante opened a new pull request, #13433:
URL: https://github.com/apache/kafka/pull/13433

   - [Jira (cyclic schema 
support)](https://issues.apache.org/jira/browse/KAFKA-3910)
   - [Jira (default value for struct 
schemas)](https://issues.apache.org/jira/browse/KAFKA-12694)
   
   Implemented in response to discussion on 
https://github.com/apache/kafka/pull/10566, where it became apparent that to 
fully support default values for struct schemas, we would also need to be able 
to support cyclical schemas.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dimitarndimitrov opened a new pull request, #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-03-21 Thread via GitHub


dimitarndimitrov opened a new pull request, #13432:
URL: https://github.com/apache/kafka/pull/13432

   Aside from consistency with the implementation of newer APIs this also 
provides more resilient retries (e.g. in case of TimeoutExceptions) and 
simplifies the code to some extent.
   
   One notable detail is the addition of unsupported version handling in the 
failure callback in the `AdminApiDriver` and as a new callback in the 
`AdminApiHandler`. The handler callback is invoked in case an 
`UnsupportedVersionException` is caught by the driver and returns a flag 
indicating whether the exception could be handled (like the similar callback in 
`Call`).
   - It also seemed prudent to signal whether the exception was caught in the 
fulfillment or the lookup stage (the latter case could then be delegated to the 
lookup strategy), but I don't know if there are actual use-cases for that.
   
   The change has been tested manually and with a new `KafkaAdminClientTest`. 
The retry aspect also unexpectedly stalled a Connect `AdminClientUnitTestEnv` 
test due to retrying after a timeout where the test expected an immediate 
failure.
   Also a JMH benchmark for the deleted equivalent of the fulfillment stage of 
the API has been deleted and not replaced as I wasn't too sure whether I can 
recreate it after the `AdminApiDriver` refactoring.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-03-21 Thread via GitHub


C0urante commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1143902556


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -315,33 +305,28 @@ public void testPause() throws Exception {
 
 taskFuture.get();
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verifyTaskGetTopic(10);

Review Comment:
   Should this use `count.get()` instead of `10`, since it's possible that the 
task may have been polled extra times between when line 293 
(`assertTrue(awaitLatch(pollLatch));`) completed and when the task finished 
pausing?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -832,166 +746,117 @@ private CountDownLatch expectPolls(int minimum, final 
AtomicInteger count) throw
 // Note that we stub these to allow any number of calls because the 
thread will continue to
 // run. The count passed in + latch returned just makes sure we get 
*at least* that number of
 // calls
-EasyMock.expect(sourceTask.poll())
-.andStubAnswer(() -> {
-count.incrementAndGet();
-latch.countDown();
-Thread.sleep(10);
-return RECORDS;
-});
+doAnswer((Answer>) invocation -> {
+count.incrementAndGet();
+latch.countDown();
+Thread.sleep(10);
+return RECORDS;
+}).when(sourceTask).poll();
+
 // Fallout of the poll() call
-expectSendRecordAnyTimes();
+expectSendRecord();
 return latch;
 }
 
 private CountDownLatch expectPolls(int count) throws InterruptedException {
 return expectPolls(count, new AtomicInteger());
 }
 
-@SuppressWarnings("unchecked")
-private void expectSendRecordSyncFailure(Throwable error) {
-expectConvertHeadersAndKeyValue(false);
-expectApplyTransformationChain(false);
-
-EasyMock.expect(
-producer.send(EasyMock.anyObject(ProducerRecord.class),
-
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
-.andThrow(error);
+private void expectSendRecord() throws InterruptedException {
+expectSendRecordTaskCommitRecordSucceed();
 }
 
-private Capture> expectSendRecordAnyTimes() 
throws InterruptedException {
-return expectSendRecordTaskCommitRecordSucceed(true);
+//
+private void expectSendRecordProducerCallbackFail() throws 
InterruptedException {
+expectSendRecord(TOPIC, false, false, true, emptyHeaders());
 }
 
-private Capture> expectSendRecordOnce() 
throws InterruptedException {
-return expectSendRecordTaskCommitRecordSucceed(false);
+private void expectSendRecordTaskCommitRecordSucceed() throws 
InterruptedException {
+expectSendRecord(TOPIC, true, true, true, emptyHeaders());
 }
 
-private Capture> 
expectSendRecordProducerCallbackFail() throws InterruptedException {
-return expectSendRecord(TOPIC, false, false, false, true, 
emptyHeaders());
-}
-
-private Capture> 
expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws 
InterruptedException {
-return expectSendRecord(TOPIC, anyTimes, true, true, true, 
emptyHeaders());
-}
-
-private Capture> 
expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws 
InterruptedException {
-return expectSendRecord(TOPIC, anyTimes, true, false, true, 
emptyHeaders());
-}
-
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-boolean sendSuccess,
-boolean commitSuccess,
-boolean isMockedConverters,
-Headers headers
+private void expectSendRecord(
+String topic,
+boolean sendSuccess,
+boolean commitSuccess,
+boolean isMockedConverters,
+Headers headers
 ) throws InterruptedException {
 if (isMockedConverters) {
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+expectConvertHeadersAndKeyValue(topic, headers);
 }
 
-expectApplyTransformationChain(anyTimes);
-
-Capture> sent = EasyMock.newCapture();
-
-// 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent),
-EasyMock.capture(producerCallbacks)));
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (org.apache.kafka.clients.producer.Callback cb : 
producerCallbacks.getValues()) {
-if (sendSuccess) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0,
- 

[jira] [Created] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-03-21 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14831:
---

 Summary: Illegal state errors should be fatal in transactional 
producer
 Key: KAFKA-14831
 URL: https://issues.apache.org/jira/browse/KAFKA-14831
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


In KAFKA-14830, the producer hit an illegal state error. The error was 
propagated to the `Sender` thread and logged, but the producer otherwise 
continued on. It would be better to make illegal state errors fatal since 
continuing to write to transactions when the internal state is inconsistent may 
cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14830) Illegal state error in transactional producer

2023-03-21 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14830:
---

 Summary: Illegal state error in transactional producer
 Key: KAFKA-14830
 URL: https://issues.apache.org/jira/browse/KAFKA-14830
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.2
Reporter: Jason Gustafson


We have seen the following illegal state error in the producer:
{code:java}
[Producer clientId=client-id2, transactionalId=transactional-id] Transiting to 
abortable error state due to org.apache.kafka.common.errors.TimeoutException: 
Expiring 1 record(s) for topic-0:120027 ms has passed since batch creation
[Producer clientId=client-id2, transactionalId=transactional-id] Transiting to 
abortable error state due to org.apache.kafka.common.errors.TimeoutException: 
Expiring 1 record(s) for topic-1:120026 ms has passed since batch creation
[Producer clientId=client-id2, transactionalId=transactional-id] Aborting 
incomplete transaction
[Producer clientId=client-id2, transactionalId=transactional-id] Invoking 
InitProducerId with current producer ID and epoch 
ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch
[Producer clientId=client-id2, transactionalId=transactional-id] ProducerId set 
to 191799 with epoch 1
[Producer clientId=client-id2, transactionalId=transactional-id] Transiting to 
abortable error state due to org.apache.kafka.common.errors.NetworkException: 
Disconnected from node 4
[Producer clientId=client-id2, transactionalId=transactional-id] Transiting to 
abortable error state due to org.apache.kafka.common.errors.TimeoutException: 
The request timed out.
[Producer clientId=client-id2, transactionalId=transactional-id] Uncaught error 
in request completion:
java.lang.IllegalStateException: TransactionalId transactional-id: Invalid 
transition attempted from state READY to state ABORTABLE_ERROR
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:829)
 {code}
The producer hits timeouts which cause it to abort an active transaction. After 
aborting, the producer bumps its epoch, which transitions it back to the 
`READY` state. Following this, there are two errors for inflight requests, 
which cause an illegal state transition to `ABORTABLE_ERROR`. But how could the 
transaction ABORT complete if there were still inflight requests? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143891018


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -137,31 +166,21 @@ CompletableFuture sendAutoCommit(final 
Map t.ack(currentTimeMs));
 }
 
-// Visible for testing
-Queue stagedCommits() {
-return this.stagedCommits;
-}
 
-private class StagedCommit {
+private class UnsentOffsetCommitRequest {
 private final Map offsets;
 private final String groupId;
 private final GroupState.Generation generation;
 private final String groupInstanceId;
 private final NetworkClientDelegate.FutureCompletionHandler future;
 
-public StagedCommit(final Map 
offsets,
-final String groupId,
-final String groupInstanceId,
-final GroupState.Generation generation) {
+public UnsentOffsetCommitRequest(final Map offsets,
+ final String groupId,
+ final String groupInstanceId,
+ final GroupState.Generation 
generation) {
 this.offsets = offsets;
-// if no callback is provided, DefaultOffsetCommitCallback will be 
used.

Review Comment:
   I'm moving the Callback trigger entirely to the foreground thread. As you 
can see, the `PrototypeAsyncConsumer` invokes: 
   ``` 
  future.whenComplete((r, t) -> {
   if (t != null) {
   callback.onComplete(offsets, new KafkaException(t));
   } else {
   callback.onComplete(offsets, null);
   }
   });
 ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143888783


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+private class UnsentOffsetFetchRequest extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequest(final Set partitions,
+final GroupState.Generation generation,
+final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = new CompletableFuture<>();
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+
+public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+groupState.groupId,
+true,
+new ArrayList<>(this.requestedPartitions),
+throwOnFetchStableOffsetUnsupported);
+NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+builder,
+coordinatorRequestManager.coordinator());
+unsentRequest.future().whenComplete((r, t) -> {
+onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+});
+return unsentRequest;
+}
+
+public void onResponse(
+final long currentTimeMs,
+final OffsetFetchResponse response) {
+Errors responseError = 
response.groupLevelError(groupState.groupId);
+if (responseError != Errors.NONE) {
+onFailure(currentTimeMs, responseError);
+return;
+}
+onSuccess(currentTimeMs, response);
+}
+
+private void onFailure(final long currentTimeMs,
+   final Errors responseError) {
+log.debug("Offset fetch failed: {}", responseError.message());
+
+// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
+// re-discover the coordinator and retry
+
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+retry(currentTimeMs);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+} else {
+future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+}
+}
+
+private void retry(final long currentTimeMs) {
+onFailedAttempt(currentTimeMs);
+onSendAttempt(currentTimeMs);
+pendingRequests.addOffsetFetchRequest(this);
+}
+
+private void onSuccess(final long currentTimeMs,
+   final OffsetFetchResponse response) {
+Set unauthorizedTopics = null;
+Map 
responseData =
+response.partitionDataMap(groupState.groupId);
+Map offsets = new 
HashMap<>(responseData.size());
+Set unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+for (Map.Entry 
entry : responseData.entrySet()) {
+TopicPartition tp = entry.getKey();
+OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+if (partitionData.hasError()) {
+Errors error = partitionData.error;
+log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+"not " +
+"exist"));
+return;
+} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+if (unauthorizedTopics == null) {
+unauthorizedTopics

[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-03-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339
 ] 

Greg Harris edited comment on KAFKA-14666 at 3/21/23 7:13 PM:
--

I proposed a tactical fix for this in 
[https://github.com/apache/kafka/pull/13429] which provides variable-precision 
translation between the most recent restart of MM2 and the end of the 
replicated topic, similar to the existing behavior pre KAFKA-12468.

This uses strategy (2) from above, but limited to syncs read during a single 
task lifetime to get monotonicity without re-reading the checkpoint topic.

A separate improvement can be considered to allow for translation of offsets 
prior to the latest restart of MM2, or increasing the accuracy of the 
translation with new configurations.


was (Author: gharris1727):
I proposed a tactical fix for this in 
[https://github.com/apache/kafka/pull/13429] which provides variable-accuracy 
translation between the most recent restart of MM2 and the end of the 
replicated topic, similar to the existing behavior pre KAFKA-12468.

This uses strategy (2) from above, but limited to syncs read during a single 
task lifetime to get monotonicity without re-reading the checkpoint topic.

A separate improvement can be considered to allow for translation of offsets 
prior to the latest restart of MM2, or increasing the accuracy of the 
translation with new configurations.

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143881070


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+private class UnsentOffsetFetchRequest extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequest(final Set partitions,
+final GroupState.Generation generation,
+final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = new CompletableFuture<>();
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+
+public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+groupState.groupId,
+true,
+new ArrayList<>(this.requestedPartitions),
+throwOnFetchStableOffsetUnsupported);
+NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+builder,
+coordinatorRequestManager.coordinator());
+unsentRequest.future().whenComplete((r, t) -> {
+onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+});
+return unsentRequest;
+}
+
+public void onResponse(
+final long currentTimeMs,
+final OffsetFetchResponse response) {
+Errors responseError = 
response.groupLevelError(groupState.groupId);
+if (responseError != Errors.NONE) {
+onFailure(currentTimeMs, responseError);
+return;
+}
+onSuccess(currentTimeMs, response);
+}
+
+private void onFailure(final long currentTimeMs,
+   final Errors responseError) {
+log.debug("Offset fetch failed: {}", responseError.message());
+
+// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
+// re-discover the coordinator and retry
+
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+retry(currentTimeMs);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+} else {
+future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+}
+}
+
+private void retry(final long currentTimeMs) {
+onFailedAttempt(currentTimeMs);
+onSendAttempt(currentTimeMs);
+pendingRequests.addOffsetFetchRequest(this);
+}
+
+private void onSuccess(final long currentTimeMs,
+   final OffsetFetchResponse response) {
+Set unauthorizedTopics = null;
+Map 
responseData =
+response.partitionDataMap(groupState.groupId);
+Map offsets = new 
HashMap<>(responseData.size());
+Set unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+for (Map.Entry 
entry : responseData.entrySet()) {
+TopicPartition tp = entry.getKey();
+OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+if (partitionData.hasError()) {
+Errors error = partitionData.error;
+log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+"not " +
+"exist"));
+return;
+} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+if (unauthorizedTopics == null) {
+unauthorizedTopics

[jira] [Comment Edited] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-03-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339
 ] 

Greg Harris edited comment on KAFKA-14666 at 3/21/23 7:08 PM:
--

I proposed a tactical fix for this in 
[https://github.com/apache/kafka/pull/13429] which provides variable-accuracy 
translation between the most recent restart of MM2 and the end of the 
replicated topic, similar to the existing behavior pre KAFKA-12468.

This uses strategy (2) from above, but limited to syncs read during a single 
task lifetime to get monotonicity without re-reading the checkpoint topic.

A separate improvement can be considered to allow for translation of offsets 
prior to the latest restart of MM2, or increasing the accuracy of the 
translation with new configurations.


was (Author: gharris1727):
I proposed a tactical fix for this in 
[https://github.com/apache/kafka/pull/13429] which provides variable-accuracy 
translation between the most recent restart of MM2 and the end of the 
replicated topic, similar to the existing behavior pre KAFKA-12468.

A separate improvement can be considered to allow for translation of offsets 
prior to the latest restart of MM2, or increasing the accuracy of the 
translation with new configurations.

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-03-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703339#comment-17703339
 ] 

Greg Harris commented on KAFKA-14666:
-

I proposed a tactical fix for this in 
[https://github.com/apache/kafka/pull/13429] which provides variable-accuracy 
translation between the most recent restart of MM2 and the end of the 
replicated topic, similar to the existing behavior pre KAFKA-12468.

A separate improvement can be considered to allow for translation of offsets 
prior to the latest restart of MM2, or increasing the accuracy of the 
translation with new configurations.

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode

2023-03-21 Thread via GitHub


C0urante commented on code in PR #13375:
URL: https://github.com/apache/kafka/pull/13375#discussion_r1143840978


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -152,11 +154,6 @@ public void stop() {
 connectCluster.forEach(this::stopWorker);
 try {
 kafkaCluster.stop();
-} catch (UngracefulShutdownException e) {
-log.warn("Kafka did not shutdown gracefully");

Review Comment:
   Is this part no longer necessary?



##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -241,7 +245,7 @@ public KafkaClusterTestKit build() throws Exception {
 bootstrapMetadata);
 } catch (Throwable e) {
 log.error("Error creating controller {}", node.id(), 
e);
-Utils.swallow(log, Level.WARN, 
"sharedServer.stopForController error", () -> sharedServer.stopForController());
+Utils.swallow(log, Level.WARN, 
"sharedServer.stopForController error", sharedServer::stopForController);

Review Comment:
   I <3 method references



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -195,7 +197,22 @@ public void testRestartFailedTask() throws Exception {
 public void testBrokerCoordinator() throws Exception {
 ConnectorHandle connectorHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
 
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, 
String.valueOf(5000));
-connect = connectBuilder.workerProps(workerProps).build();
+Properties brokerProps = new Properties();
+
+// Find a free port and use it in the Kafka broker's listeners config. 
We can't use port 0 in the listeners
+// config to get a random free port because in this test we want to 
stop the Kafka broker and then bring it
+// back up and listening on the same port in order to verify that the 
Connect cluster can re-connect to Kafka
+// and continue functioning normally. If we were to use port 0 here, 
the Kafka broker would most likely listen
+// on a different random free port the second time it is started.

Review Comment:
   Any reason we're not preserving the  `startOnlyKafkaOnSamePorts` method for 
this type of use case?
   
   I see that this change is mentioned in the description, but don't see a 
rationale for it.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -106,9 +106,6 @@ public void testSingleNodeCluster() throws Exception {
 EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
 EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
 
-clusterA.start();
-clusterB.start();

Review Comment:
   🤦 



##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -174,13 +174,17 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 props.put(KafkaConfig$.MODULE$.LogDirsProp(),
 String.join(",", brokerNode.logDataDirectories()));
 }
-props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
-props.put(KafkaConfig$.MODULE$.ListenersProp(), 
listeners(node.id()));
-props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
-nodes.interBrokerListenerName().value());
-props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
-"CONTROLLER");
+
+// listeners could be defined via Builder::setConfigProp which 
shouldn't be overridden
+if (!props.containsKey(KafkaConfig$.MODULE$.ListenersProp())) {

Review Comment:
   I wonder if this has the potential to be a footgun. We're gating the default 
values of several properties on the presence of a single one; could it confuse 
people to set just the `listeners` property and then see failures because the 
previously-used defaults for the protocol map, controller listener, etc. 
weren't set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143865476


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -113,15 +112,45 @@ private void maybeAutoCommit(final long currentTimeMs) {
 }
 
 Map allConsumedOffsets = 
subscriptionState.allConsumed();
-log.debug("Auto-committing offsets {}", allConsumedOffsets);
 sendAutoCommit(allConsumedOffsets);
 autocommit.resetTimer();
+}
+
+/**
+ * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
+ * {@link UnsentOffsetCommitRequest} and enqueue it to send later.
+ */
+public CompletableFuture addOffsetCommitRequest(final 
Map offsets) {
+return pendingRequests.addOffsetCommitRequest(offsets);
+}
+
+/**
+ * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
 It creates an
+ * {@link UnsentOffsetFetchRequest} and enqueue it to send later.
+ */
+public CompletableFuture> 
addOffsetFetchRequest(final Set partitions) {
+return pendingRequests.addOffsetFetchRequest(partitions);
+}
+
+public void clientPoll(final long currentTimeMs) {
+this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
+}
+
 
+// Visible for testing
+List unsentOffsetFetchRequests() {
+return pendingRequests.unsentOffsetFetches;
+}
+
+// Visible for testing
+Queue unsentOffsetCommitRequests() {
+return pendingRequests.unsentOffsetCommits;
 }
 
 // Visible for testing
 CompletableFuture sendAutoCommit(final Map allConsumedOffsets) {
-CompletableFuture future = this.add(allConsumedOffsets)
+log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
+return this.addOffsetCommitRequest(allConsumedOffsets)

Review Comment:
   I think you are right, we could just use the pendingQueue to handle the 
inflight auto commit. Thanks for spotting this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-03-21 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-14666:
---

Assignee: Greg Harris

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143865476


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -113,15 +112,45 @@ private void maybeAutoCommit(final long currentTimeMs) {
 }
 
 Map allConsumedOffsets = 
subscriptionState.allConsumed();
-log.debug("Auto-committing offsets {}", allConsumedOffsets);
 sendAutoCommit(allConsumedOffsets);
 autocommit.resetTimer();
+}
+
+/**
+ * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
+ * {@link UnsentOffsetCommitRequest} and enqueue it to send later.
+ */
+public CompletableFuture addOffsetCommitRequest(final 
Map offsets) {
+return pendingRequests.addOffsetCommitRequest(offsets);
+}
+
+/**
+ * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
 It creates an
+ * {@link UnsentOffsetFetchRequest} and enqueue it to send later.
+ */
+public CompletableFuture> 
addOffsetFetchRequest(final Set partitions) {
+return pendingRequests.addOffsetFetchRequest(partitions);
+}
+
+public void clientPoll(final long currentTimeMs) {
+this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
+}
+
 
+// Visible for testing
+List unsentOffsetFetchRequests() {
+return pendingRequests.unsentOffsetFetches;
+}
+
+// Visible for testing
+Queue unsentOffsetCommitRequests() {
+return pendingRequests.unsentOffsetCommits;
 }
 
 // Visible for testing
 CompletableFuture sendAutoCommit(final Map allConsumedOffsets) {
-CompletableFuture future = this.add(allConsumedOffsets)
+log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
+return this.addOffsetCommitRequest(allConsumedOffsets)

Review Comment:
   I think you are right, we could just use the pendingQueue to handle the 
inflight auto commit. Thanks for spotting this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13431: KAFKA-14491: [19/N] Combine versioned store RocksDB instances into one

2023-03-21 Thread via GitHub


vcrfxia commented on code in PR #13431:
URL: https://github.com/apache/kafka/pull/13431#discussion_r1143859781


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -821,4 +823,20 @@ public Options getOptions() {
 public Position getPosition() {
 return position;
 }
+
+/**
+ * Same as {@link Bytes#increment(Bytes)} but {@code null} is returned 
instead of throwing
+ * {@code IndexOutOfBoundsException} in the event of overflow.
+ *
+ * @param input bytes to increment
+ * @return A new copy of the incremented byte array, or {@code null} if 
incrementing would
+ * result in overflow.
+ */
+static Bytes incrementWithoutOverflow(final Bytes input) {

Review Comment:
   Now that negative segment ID is allowed (only for reserved segments within 
`LogicalKeyValueSegments`), it's possible that a valid segment ID will overflow 
when incremented, and we want to handle this gracefully instead of throwing an 
exception when it happens.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##
@@ -16,15 +16,33 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
+/**
+ * A {@link Segments} implementation which uses a single underlying RocksDB 
instance.
+ * Regular segments with {@code segmentId >= 0} expire according to the 
specified
+ * retention period. "Reserved" segments with {@code segmentId < 0} do not 
expire
+ * and are completely separate from regular segments in that methods such as
+ * {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, 
ProcessorContext)},
+ * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
+ * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
+ * only return regular segments and not reserved segments. The methods {@link 
#flush()}
+ * and {@link #close()} flush and close both regular and reserved segments, 
due to
+ * the fact that both types of segments share the same physical RocksDB 
instance.
+ * To create a reserved segment, use {@link #createReservedSegment(long, 
String)} instead.
+ */
 public class LogicalKeyValueSegments extends 
AbstractSegments {
 
 private final RocksDBMetricsRecorder metricsRecorder;
 private final RocksDBStore physicalStore;
 
+// reserved segments do not expire, and are tracked here separately from 
regular segments
+private final Map reservedSegments = new 
HashMap<>();

Review Comment:
   Only one reserved segment is needed for the versioned store implementation 
(used for the latest value store) but I've chosen to implement support for 
reserved segments more generally in this class. If we think this is 
unnecessarily complex, I can update this to be a single 
`Optional` (or equivalent) instead.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -124,7 +124,6 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS
 protected Position position;
 private OffsetCheckpoint positionCheckpoint;
 
-// VisibleForTesting

Review Comment:
   This cleanup is unrelated to the changes in this PR -- I happened to notice 
that this method is labeled as visible for testing but is actually called in a 
number of places from production code, so I've removed the misleading comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #13430: MINOR; Increase log level of some rare events

2023-03-21 Thread via GitHub


hachikuji commented on PR #13430:
URL: https://github.com/apache/kafka/pull/13430#issuecomment-1478422374

   > Okay. I'll revert the change to the send request backoff but I think we 
should keep the candidate backoff as that is "rare" and would help us debug 
election delays. What do you think?
   
   Yeah, that one is probably fine. One message per election attempt seems ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13431: KAFKA-14491: [19/N] Combine versioned store RocksDB instances into one

2023-03-21 Thread via GitHub


vcrfxia opened a new pull request, #13431:
URL: https://github.com/apache/kafka/pull/13431

   The RocksDB-based versioned store implementation introduced in 
https://github.com/apache/kafka/pull/13188 currently uses two physical RocksDB 
instances per store instance: one for the "latest value store" and another for 
the "segments store." This PR combines those two RocksDB instances into one by 
representing the latest value store as a special "reserved" segment within the 
segments store. This reserved segment has segment ID -1, is never expired, and 
is not included in the regular `Segments` methods for getting or creating 
segments, but is represented in the physical RocksDB instance the same way as 
any other segment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13430: MINOR; Increase log level of some rare events

2023-03-21 Thread via GitHub


jsancio commented on PR #13430:
URL: https://github.com/apache/kafka/pull/13430#issuecomment-1478413634

   > Upgrading the error messages is probably fair, but do we really need the 
backoff messages at INFO level? Seems like they have potential to be noisy.
   
   Okay. I'll revert the change to the send request backoff but I think we 
should keep the candidate backoff as that is "rare" and would help us debug 
election delays. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #13430: MINOR; Increase log level of some rare events

2023-03-21 Thread via GitHub


hachikuji commented on PR #13430:
URL: https://github.com/apache/kafka/pull/13430#issuecomment-1478403979

   Upgrading the error messages is probably fair, but do we really need the 
backoff messages at INFO level? Seems like they have potential to be noisy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13430: MINOR; Increase log level of some rare events

2023-03-21 Thread via GitHub


jsancio opened a new pull request, #13430:
URL: https://github.com/apache/kafka/pull/13430

   To help debug KRaft's behavior this change increases the log level of some 
rare messages to INFO level.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13426: KAFKA-14814: Skip target state updates when the configs store has sam…

2023-03-21 Thread via GitHub


C0urante commented on PR #13426:
URL: https://github.com/apache/kafka/pull/13426#issuecomment-1478385471

   @yashmayya that's my bad, it does appear that you're correct and that the 
`Connector` won't be restarted, but a request for new task configs will take 
place.
   
   IMO this change still has some value since, at the very least, it leads to 
cleaner logs when redundant requests are made. But if we think this isn't worth 
the implementation complexity and testing efforts, we don't have to merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14740) Missing source tag on MirrorSource metrics

2023-03-21 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14740.

Fix Version/s: 3.5.0
   Resolution: Fixed

> Missing source tag on MirrorSource metrics
> --
>
> Key: KAFKA-14740
> URL: https://issues.apache.org/jira/browse/KAFKA-14740
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.5.0
>
>
> The metrics defined in MirrorSourceMetrics have the following tags "target", 
> "topic", "partition". It would be good to also have a "source" tag with the 
> source cluster alias.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911

2023-03-21 Thread via GitHub


mimaison merged PR #13420:
URL: https://github.com/apache/kafka/pull/13420


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911

2023-03-21 Thread via GitHub


mimaison commented on PR #13420:
URL: https://github.com/apache/kafka/pull/13420#issuecomment-1478373825

   I've now closed the voting thread. [Test 
failures](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-13420/1/testReport/)
 are not related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-21 Thread via GitHub


C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1143650777


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -261,6 +326,78 @@ public void testNewTopicConfigs() throws Exception {
 verify(connector).createNewTopics(any(), any());
 }
 
+@Test
+public void testIncrementalAlterConfigsRequested() throws Exception {
+MockAdminClient admin = spy(new MockAdminClient());
+MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+new DefaultReplicationPolicy(), 
MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG, new 
DefaultConfigPropertyFilter(), admin));
+final String topic = "testtopic";
+List entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+Config config = new Config(entries);
+doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+doNothing().when(connector).deprecatedAlterConfigs(any());
+connector.syncTopicConfigs();
+Map topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+verify(connector).incrementalAlterConfigs(topicConfigs);
+
+// the next time we sync topic configurations, expect to use the 
deprecated API
+connector.syncTopicConfigs();
+verify(connector, times(2)).syncTopicConfigs();

Review Comment:
   There's no need to verify these invocations since we make both of them in 
this test case ourselves.
   
   (This applies to other test cases as well, such as 
`testIncrementalAlterConfigsRequired`.)



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -514,6 +541,40 @@ private void updateTopicConfigs(Map 
topicConfigs) {
 }));
 }
 
+// visible for testing
+@SuppressWarnings("deprecation")

Review Comment:
   This part can be removed now (🎉)
   ```suggestion
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -540,10 +601,13 @@ Map describeTopicConfigs(Set 
topics)
 
 Config targetConfig(Config sourceConfig) {
 List entries = sourceConfig.entries().stream()
-.filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-.filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-.filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-.collect(Collectors.toList());
+.filter(x -> 
(!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG)
+|| (x.isDefault() && 
shouldReplicateSourceDefault(x.name(
+|| (!x.isDefault() && 
useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG)))
+.filter(x -> !x.isReadOnly() && !x.isSensitive())
+.filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
+.filter(x -> 
shouldReplicateTopicConfigurationProperty(x.name()))

Review Comment:
   This double-checks default values, which I don't believe is correct (we 
should rely solely on `ConfigPropertyFilter::shouldReplicateSourceDefault` for 
those).
   
   Perhaps all of the logic can be moved to the 
`shouldReplicateTopicConfigurationProperty` method, including determining 
whether the property is a default or not (and how to handle that accordingly)?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -30,6 +30,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
 
 public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
 public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+public static final String USE_DEFAULTS_FROM = "use.defaults.from";
+private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's 
defaults to use when syncing topic configurations.";
+private static final String USE_DEFAULTS_FROM_DEFAULT = "target";
 
 private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic 
configuration properties and/or regexes "
 + "that should 
not be replicated.";

Review Comment:
   Can we add a note that this will also apply to properties with default 
values if `use.defaults.from` is set to `source`?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourc

[GitHub] [kafka] C0urante commented on a diff in pull request #13426: KAFKA-14814: Skip target state updates when the configs store has sam…

2023-03-21 Thread via GitHub


C0urante commented on code in PR #13426:
URL: https://github.com/apache/kafka/pull/13426#discussion_r1143794237


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java:
##
@@ -889,6 +890,51 @@ public void testBackgroundUpdateTargetState() throws 
Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSameTargetState() throws Exception {
+// verify that we handle target state changes correctly when they come 
up through the log
+
+expectConfigure();
+List> existingRecords = Arrays.asList(
+new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 
0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 
0, 0, TASK_CONFIG_KEYS.get(0),
+CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 
0, 0, TASK_CONFIG_KEYS.get(1),
+CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()));
+LinkedHashMap deserialized = new LinkedHashMap<>();
+deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+logOffset = 5;
+
+expectStart(existingRecords, deserialized);
+
+// on resume update listener isn't called

Review Comment:
   We don't have any explicit expectations or checks in this test that the 
config update listener is never called, but the test would still fail if it 
were invoked when we call `PowerMock::verifyFall`. Is that correct?
   
   If so, is there any way we can make that logic explicit? I'm worried someone 
might migrate this test over to Mockito (which has nice mocks by default) and 
accidentally cause this test to start spuriously passing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-21 Thread via GitHub


vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1143766313


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
 .withPartitions(Collections.singleton(0));
 final StateQueryResult result =
 IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+}
+
+@Test
+public void shouldCreateGlobalTable() throws Exception {
+// produce data to global store topic and track in-memory for 
processor to verify
+final DataTracker data = new DataTracker();
+produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+// build topology and start app
+final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+streamsBuilder
+.globalTable(
+globalTableTopic,
+Consumed.with(Serdes.Integer(), Serdes.String()),
+Materialized
+.as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+.withKeySerde(Serdes.Integer())
+.withValueSerde(Serdes.String())
+);
+streamsBuilder
+.stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+.process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+.to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+final Properties props = props();
+kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+kafkaStreams.start();
+
+// produce source data to trigger store verifications in processor
+int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+// wait for output and verify
+final List> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+TestUtils.consumerConfig(
+CLUSTER.bootstrapServers(),
+IntegerDeserializer.class,
+IntegerDeserializer.class),
+outputStream,
+numRecordsProduced);
+
+for (final KeyValue receivedRecord : 
receivedRecords) {
+// verify zero failed checks for each record
+assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   > I was referring to this comment: 
https://github.com/apache/kafka/pull/13340#discussion_r1128550162
   
   I see. My previous comment was about whether we can have a single processor 
write to the store and also read from it in order to verify its contents after 
writing. While it is possible to write to a global store from a processor (via 
`addGlobalStore()`), it is not possible to have that processor also write 
verification results downstream (the signature of that processor always returns 
`void` from its `process()` method). So no matter what we need to separate the 
verification logic from the logic that writes to the store. That's what I was 
trying to say earlier, but in an abbreviated/confusing way.
   
   Sounds like we're on the same page now regarding what the processor which 
reads from the store (and writes verification results downstream) is doing, so 
we should be good 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14585) Move StorageTool to tools

2023-03-21 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703309#comment-17703309
 ] 

Muralidhar Basani commented on KAFKA-14585:
---

[~mimaison] I think am done with the changes. I noticed 'formatCommand' method 
is invoked from other core classes which are not part of this change. Hence 
that method is also copied to ToolsUtils.

Can you pls take a look at the PR ?

> Move StorageTool to tools
> -
>
> Key: KAFKA-14585
> URL: https://issues.apache.org/jira/browse/KAFKA-14585
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Muralidhar Basani
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143739168


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+private class UnsentOffsetFetchRequest extends RequestState {

Review Comment:
   yap that sounds good!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-21 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1143738304


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+private class UnsentOffsetFetchRequest extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequest(final Set partitions,
+final GroupState.Generation generation,
+final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = new CompletableFuture<>();
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+
+public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+groupState.groupId,
+true,
+new ArrayList<>(this.requestedPartitions),
+throwOnFetchStableOffsetUnsupported);
+NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+builder,
+coordinatorRequestManager.coordinator());
+unsentRequest.future().whenComplete((r, t) -> {
+onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+});
+return unsentRequest;
+}
+
+public void onResponse(
+final long currentTimeMs,
+final OffsetFetchResponse response) {
+Errors responseError = 
response.groupLevelError(groupState.groupId);
+if (responseError != Errors.NONE) {
+onFailure(currentTimeMs, responseError);
+return;
+}
+onSuccess(currentTimeMs, response);
+}
+
+private void onFailure(final long currentTimeMs,
+   final Errors responseError) {
+log.debug("Offset fetch failed: {}", responseError.message());
+
+// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
+// re-discover the coordinator and retry
+
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+retry(currentTimeMs);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+} else {
+future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+}
+}
+
+private void retry(final long currentTimeMs) {
+onFailedAttempt(currentTimeMs);
+onSendAttempt(currentTimeMs);
+pendingRequests.addOffsetFetchRequest(this);
+}
+
+private void onSuccess(final long currentTimeMs,
+   final OffsetFetchResponse response) {
+Set unauthorizedTopics = null;
+Map 
responseData =
+response.partitionDataMap(groupState.groupId);
+Map offsets = new 
HashMap<>(responseData.size());
+Set unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+for (Map.Entry 
entry : responseData.entrySet()) {
+TopicPartition tp = entry.getKey();
+OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+if (partitionData.hasError()) {
+Errors error = partitionData.error;
+log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+"not " +
+"exist"));
+return;
+} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+if (unauthorizedTopics == null) {
+unauthorizedTopics

[jira] [Resolved] (KAFKA-8713) [Connect] JsonConverter NULL Values are replaced by default values even in NULLABLE fields

2023-03-21 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-8713.
---
Fix Version/s: 3.5.0
 Assignee: Mickael Maison
   Resolution: Fixed

> [Connect] JsonConverter NULL Values are replaced by default values even in 
> NULLABLE fields
> --
>
> Key: KAFKA-8713
> URL: https://issues.apache.org/jira/browse/KAFKA-8713
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.3.0, 2.2.1
>Reporter: Cheng Pan
>Assignee: Mickael Maison
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.5.0
>
>
> Class JsonConverter line: 582
> {code:java}
> private static JsonNode convertToJson(Schema schema, Object logicalValue) 
> {
> if (logicalValue == null) {
> if (schema == null) // Any schema is valid and we don't have a 
> default, so treat this as an optional schema
> return null;
> if (schema.defaultValue() != null)
> return convertToJson(schema, schema.defaultValue());
> if (schema.isOptional())
> return JsonNodeFactory.instance.nullNode();
> throw new DataException("Conversion error: null value for field 
> that is required and has no default value");
> }
> 
> }
> {code}
> h1.Expect:
> Value `null` is valid for an optional filed, even though the filed has a 
> default value.
>  Only when field is required, the converter return default value fallback 
> when value is `null`.
> h1.Actual:
> Always return default value if `null` was given.
> h1. Example:
> I'm not sure if the current behavior is the exactly expected, but at least on 
> MySQL, a table  define as 
> {code:sql}
> create table t1 {
>name varchar(40) not null,
>create_time datetime default '1999-01-01 11:11:11' null,
>update_time datetime default '1999-01-01 11:11:11' null
> }
> {code}
> Just insert a record:
> {code:sql}
> INSERT INTO `t1` (`name`,  `update_time`) VALUES ('kafka', null);
> {code}
> The result is:
> {code:json}
> {
> "name": "kafka",
> "create_time": "1999-01-01 11:11:11",
> "update_time": null
> }
> {code}
> But when I use debezium pull binlog and send the record to Kafka with 
> JsonConverter, the result changed to:
> {code:json}
> {
> "name": "kafka",
> "create_time": "1999-01-01 11:11:11",
> "update_time": "1999-01-01 11:11:11"
> }
> {code}
> For more details, see: https://issues.jboss.org/browse/DBZ-1064



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13419: KAFKA-8713: Allow using null for field in JsonConverter (KIP-581)

2023-03-21 Thread via GitHub


mimaison merged PR #13419:
URL: https://github.com/apache/kafka/pull/13419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mukkachaitanya commented on pull request #13426: KAFKA-14814: Skip target state updates when the configs store has sam…

2023-03-21 Thread via GitHub


mukkachaitanya commented on PR #13426:
URL: https://github.com/apache/kafka/pull/13426#issuecomment-1478255352

   Hi, @yashmayya / @C0urante let me know if you folks can take a stab at 
reviewing the PR. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13419: KAFKA-8713: Allow using null for field in JsonConverter (KIP-581)

2023-03-21 Thread via GitHub


mimaison commented on PR #13419:
URL: https://github.com/apache/kafka/pull/13419#issuecomment-1478251287

   Thanks for the review @C0urante !
   I've now closed the voting thread. Even though they are not available here, 
tests have 
[run](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-13419/1/)
 in the CI, and none of the failure are related. Merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13428: MINOR: Refactor Mirror integration tests to reduce duplication

2023-03-21 Thread via GitHub


gharris1727 opened a new pull request, #13428:
URL: https://github.com/apache/kafka/pull/13428

   The IdentityReplicationIntegrationTest nontrivially overrides two tests from 
the Base test. Most of the implementation is copied, but modifications to the 
copy change assertions that have to do with the backup -> primary mirror flow.
   
   Instead of having the Base::testReplication which is active-active, and the 
Identity::testReplication which is primary->backup only, use a flag to control 
the assertions for the two variants of the test, so that most of the code can 
be deduplicated.
   
   Additionally, utilize the remotTopicName test function to derive target 
topic names programmatically, instead of hardcoding them in the two variants of 
the test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13382: KAFKA-14722: Make BooleanSerde public

2023-03-21 Thread via GitHub


wcarlson5 commented on code in PR #13382:
URL: https://github.com/apache/kafka/pull/13382#discussion_r1143691894


##
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##
@@ -48,42 +45,4 @@ public NullableValueAndTimestampSerde(final Serde 
valueSerde) {
 );
 }
 
-static final class BooleanSerde {

Review Comment:
   Are there tests for this code that need to cleaned up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors

2023-03-21 Thread via GitHub


mimaison commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1143576130


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java:
##
@@ -103,6 +103,9 @@ public List> taskConfigs(int maxTasks) {
 public void stop() {
 log.info("Stopped {} connector {}", this.getClass().getSimpleName(), 
connectorName);
 connectorHandle.recordConnectorStop();
+if 
(Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error", 
"false"))) {
+throw new RuntimeException("Injecting errors during connector 
start");

Review Comment:
   `during connector start` -> `during connector stop`



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##
@@ -245,6 +245,14 @@ default void validateConnectorConfig(Map 
connectorConfig, Callba
  */
 void restartConnectorAndTasks(RestartRequest request, 
Callback cb);
 
+/**
+ * Stop the conector. This call will asynchronously suspend processing by 
the connector and all

Review Comment:
   `connector`



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, 
final Map
 );
 }
 
+@Override
+public void stopConnector(final String connName, final Callback 
callback) {
+log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+addRequest(
+() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!configState.contains(connName))
+throw new NotFoundException("Unknown connector " + 
connName);
+
+// We only allow the leader to handle this request since 
it involves writing task configs to the config topic
+if (!isLeader()) {
+callback.onCompletion(new NotLeaderException("Only the 
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+return null;
+}
+
+// TODO: We may want to add a new ConfigBackingStore 
method for stopping a connector so that

Review Comment:
   Should we create a ticket for this TODO?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -221,28 +223,44 @@ public boolean isRunning() {
 }
 
 @SuppressWarnings("fallthrough")
-private void pause() {
+private void stop(boolean paused) {
+State newState = paused ? State.PAUSED : State.STOPPED;
 try {
-switch (state) {
-case STOPPED:
-return;
+if ((state == State.STOPPED || state == State.PAUSED) && state == 
newState) {

Review Comment:
   As `newState` is either `PAUSED` or `STOPPED`, would `state == newState` be 
enough here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-21 Thread via GitHub


C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1143623680


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -514,6 +540,37 @@ private void updateTopicConfigs(Map 
topicConfigs) {
 }));
 }
 
+// visible for testing
+void incrementalAlterConfigs(Map topicConfigs) {
+Map> configOps = new 
HashMap<>();
+for (Map.Entry topicConfig : topicConfigs.entrySet()) {
+Collection ops = new ArrayList<>();
+ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+for (ConfigEntry config : topicConfig.getValue().entries()) {
+if (config.isDefault() && 
!shouldReplicateSourceDefault(config.source())) {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+} else {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+}
+}
+configOps.put(configResource, ops);
+}
+log.trace("Syncing configs for {} topics.", configOps.size());
+
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
+if (e != null) {
+if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT)

Review Comment:
   We can mock the connector's context and ensure that the correct method has 
been invoked:
   ```java
   @Test
   public void testIncrementalAlterConfigsRequiredButUnsupported() throws 
Exception {
   MockAdminClient admin = spy(new MockAdminClient());
   ConnectorContext connectorContext = mock(ConnectorContext.class);
   MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
   new DefaultReplicationPolicy(), 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIG, new 
DefaultConfigPropertyFilter(), admin));
   connector.initialize(connectorContext);
   final String topic = "testtopic";
   List entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
   Config config = new Config(entries);
   doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
   doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
   doNothing().when(connector).deprecatedAlterConfigs(any());
   
   connector.syncTopicConfigs();
   
verify(connectorContext).raiseError(isA(UnsupportedVersionException.class));
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911

2023-03-21 Thread via GitHub


C0urante commented on PR #13420:
URL: https://github.com/apache/kafka/pull/13420#issuecomment-1477881989

   (I did not notice that the KIP voting thread has not closed yet. This should 
probably only be merged after that happens 😄)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14797) MM2 does not emit offset syncs when conservative translation logic exceeds positive max.offset.lag

2023-03-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14797.
---
Resolution: Fixed

> MM2 does not emit offset syncs when conservative translation logic exceeds 
> positive max.offset.lag
> --
>
> Key: KAFKA-14797
> URL: https://issues.apache.org/jira/browse/KAFKA-14797
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> This is a regression in MirrorMaker 2 introduced by KAFKA-12468.
> Reproduction steps:
> 1. Set max.offset.lag to a non-zero value.
> 2. Set up a 1-1 replication flow which does not skip upstream offsets or have 
> a concurrent producer to the target topic.
> 3. Produce more than max.offset.lag records to the source topic and allow 
> replication to proceed.
> 4. Examine end offsets, checkpoints and/or target consumer group lag
> Expected behavior:
> Consumer group lag should be at most max.offset.lag.
> Actual behavior:
> Consumer group lag is significantly larger than max.offset.lag.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag

2023-03-21 Thread via GitHub


C0urante commented on PR #13367:
URL: https://github.com/apache/kafka/pull/13367#issuecomment-1477875999

   Merged and backported to 3.3 and 3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703204#comment-17703204
 ] 

Atul Jain commented on KAFKA-14597:
---

[~talestonini] I have another issue with process-latency [-avg, -max] . Is it 
possible that the observed discrepancies in latencies are due to the reasons 
you have mentioned above?

I will describe my topology here. I have two sources and both these sources are 
writing data to their respective state stores. I have added delays (using 
sleep) of 3 and 1 sec respectively while writing to these state stores. I also 
have a PunctuateProcessor which runs Punctuate(60s Wall Clock Time). The 
punctuate logic checks the data in both state stores and adds them to sink. 
(attached topology diagram)

!image-2023-03-21-19-01-54-713.png!

*expected values of latencies*
process-latency-max = 3sec 
process-latency-avg = avg(3sec, 1sec) = 2sec

*Observed latencies on stream-thread-metrics*

process-latency-max = 2334 ms 
process-latency-avg = 1971.91 ms

!image-2023-03-21-19-03-07-525.png!


*Observed latencies on stream-task-metrics* : (Obtained by setting 
metrics.recording.level=DEBUG)
process-latency-max = 3000 ms 
process-latency-avg = 2034 ms

!image-2023-03-21-19-03-28-625.png!

 

Kindly let me know if it is a different issue and needs to be addressed 
separately. Thanks

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, 
> image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Jain updated KAFKA-14597:
--
Attachment: image-2023-03-21-19-03-28-625.png

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, 
> image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Jain updated KAFKA-14597:
--
Attachment: image-2023-03-21-19-03-07-525.png

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, 
> image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Jain updated KAFKA-14597:
--
Attachment: image-2023-03-21-19-01-54-713.png

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag

2023-03-21 Thread via GitHub


C0urante merged PR #13367:
URL: https://github.com/apache/kafka/pull/13367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-21 Thread via GitHub


C0urante commented on PR #13425:
URL: https://github.com/apache/kafka/pull/13425#issuecomment-1477800891

   
![fetch](https://user-images.githubusercontent.com/8636148/226614066-a13bd150-9bf5-4c1e-b578-8b6f1bf51c55.gif)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13326: KAFKA-14774 the removed listeners should not be reconfigurable

2023-03-21 Thread via GitHub


chia7712 commented on PR #13326:
URL: https://github.com/apache/kafka/pull/13326#issuecomment-1477661271

   @showuon @mumrah Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby

2023-03-21 Thread via GitHub


lucasbru commented on code in PR #13369:
URL: https://github.com/apache/kafka/pull/13369#discussion_r1143208858


##
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * An integration test to verify EOS properties when using Caching and Standby 
replicas
+ * while tasks are being redistributed after re-balancing event.
+ * The intent is not that this test should be merged into the repo but only 
provided for evidence on how to reproduce.
+ * One test fail and two test pass reliably on an i7-8750H CPU @ 2.20GHz × 12 
with 32 GiB Memory
+ */
+@Category(IntegrationTest.class)
+@SuppressWarnings("deprecation")

Review Comment:
   No need to bite the bullet from my side ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #13097: [Draft] KAFKA-10532: close clean for EOS when it's RUNNING standby or RESTORING active

2023-03-21 Thread via GitHub


lucasbru commented on PR #13097:
URL: https://github.com/apache/kafka/pull/13097#issuecomment-1477637108

   > 2. streams exception: translated from kafka exception from restore 
consumer's other calls, as well as the restore callback)
   
   `InvalidOffsetException` is rethrown as `StreamsException` in 
`prepareChangelogs` and `hasRestoredToEnd`. That seems to violate your 
categorization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703143#comment-17703143
 ] 

Atul Jain commented on KAFKA-14597:
---

I currently have 2.6.0 version

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> process-latency-max.jpg, record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] muralibasani commented on pull request #13417: KAFKA-14585: Moving StorageTool from core to tools module

2023-03-21 Thread via GitHub


muralibasani commented on PR #13417:
URL: https://github.com/apache/kafka/pull/13417#issuecomment-1477601010

   @mimaison PR is ready to review. 
   - Had to modify build.gradle to avoid circular dependency. As formatCommand 
is being used in other classes in core module.
   - There is an unrelated test failing I guess. Hope it's ok ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13382: KAFKA-14722: Make BooleanSerde public

2023-03-21 Thread via GitHub


lucasbru commented on code in PR #13382:
URL: https://github.com/apache/kafka/pull/13382#discussion_r1143171856


##
clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java:
##
@@ -274,6 +282,13 @@ static public Serde UUID() {
 return new UUIDSerde();
 }
 
+/**
+ * A serde for nullable {@code Boolean} type.

Review Comment:
   I checked again, and I guess you don't have to worry too much about my 
concern. When I read that comment you wrote, I understand "this class turns 
`true`, `false`, `null` into a byte array", but it actually maps `null` to 
`null` and the caller has to find a way how to deal with the `null` result. 
This seems like this is a convention generally in KStreams, so I think the code 
is fine, I was just worried about giving the wrong impression to as user of 
this function, but, as I said, probably safe to ignore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-21 Thread via GitHub


tinaselenge commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1143154814


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -514,6 +540,37 @@ private void updateTopicConfigs(Map 
topicConfigs) {
 }));
 }
 
+// visible for testing
+void incrementalAlterConfigs(Map topicConfigs) {
+Map> configOps = new 
HashMap<>();
+for (Map.Entry topicConfig : topicConfigs.entrySet()) {
+Collection ops = new ArrayList<>();
+ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+for (ConfigEntry config : topicConfig.getValue().entries()) {
+if (config.isDefault() && 
!shouldReplicateSourceDefault(config.source())) {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+} else {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+}
+}
+configOps.put(configResource, ops);
+}
+log.trace("Syncing configs for {} topics.", configOps.size());
+
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
+if (e != null) {
+if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT)

Review Comment:
   I have addressed this by adding `context.raiseError(new 
UnsupportedVersionException())` however didn't test this in the unit tests. I 
tried mocking the API to return `UnsupportedVersionException`, but not sure how 
to check the error from the connect framework in the test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-21 Thread via GitHub


tinaselenge commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1143154814


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -514,6 +540,37 @@ private void updateTopicConfigs(Map 
topicConfigs) {
 }));
 }
 
+// visible for testing
+void incrementalAlterConfigs(Map topicConfigs) {
+Map> configOps = new 
HashMap<>();
+for (Map.Entry topicConfig : topicConfigs.entrySet()) {
+Collection ops = new ArrayList<>();
+ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+for (ConfigEntry config : topicConfig.getValue().entries()) {
+if (config.isDefault() && 
!shouldReplicateSourceDefault(config.source())) {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+} else {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+}
+}
+configOps.put(configResource, ops);
+}
+log.trace("Syncing configs for {} topics.", configOps.size());
+
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
+if (e != null) {
+if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT)

Review Comment:
   I have addressed this by adding `context.raiseError(new 
UnsupportedVersionException())` however didn't test this in the unit tests. I 
tried mocking the API to return `UnsupportedVersionException`, but not sure how 
to catch the error from the connect framework in the test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Tales Tonini (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703129#comment-17703129
 ] 

Tales Tonini commented on KAFKA-14597:
--

Hi [~atuljainiitk] , may I ask what Kafka Streams version you have in your app? 
Thanks.

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> process-latency-max.jpg, record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-21 Thread via GitHub


tinaselenge commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1143144850


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -514,6 +540,37 @@ private void updateTopicConfigs(Map 
topicConfigs) {
 }));
 }
 
+// visible for testing
+void incrementalAlterConfigs(Map topicConfigs) {
+Map> configOps = new 
HashMap<>();
+for (Map.Entry topicConfig : topicConfigs.entrySet()) {
+Collection ops = new ArrayList<>();
+ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+for (ConfigEntry config : topicConfig.getValue().entries()) {
+if (config.isDefault() && 
!shouldReplicateSourceDefault(config.source())) {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+} else {
+ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+}
+}
+configOps.put(configResource, ops);
+}
+log.trace("Syncing configs for {} topics.", configOps.size());
+
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
+if (e != null) {
+if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT)
+&& e instanceof UnsupportedVersionException) {
+//Fallback logic
+log.warn("The target cluster {} is not compatible with 
IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for 
syncing topic configurations", sourceAndTarget.target(), e);
+alterConfigs(topicConfigs);

Review Comment:
   I agree with this, and removed the alterConfigs call from here. Setting the 
`use.incremental.alter.configs` to `never` should be enough here so that next 
time we sync configs, it should use the deprecated API. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13427: MINOR:Incorrect/canonical use of constants in AdminClientConfig and StreamsConfigTest

2023-03-21 Thread via GitHub


hudeqi commented on PR #13427:
URL: https://github.com/apache/kafka/pull/13427#issuecomment-1477565223

   Hello, please help to review. @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-21 Thread via GitHub


tinaselenge commented on PR #13373:
URL: https://github.com/apache/kafka/pull/13373#issuecomment-1477564128

   @mimaison @C0urante Thank you so much for reviewing the PR. I think I have 
addressed all the review comments, please let me know if I have missed anything 
or you have any further comments. 
   
   The jenkins build has failed due to mostly unrelated tests, except one 
integration test for mirrormaker, `MirrorConnectorsIntegrationSSLTest > 
testReplication() FAILED`. However, this test passes on my local machine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi closed pull request #13377: MINOR: Fix typos in JaasContext

2023-03-21 Thread via GitHub


hudeqi closed pull request #13377: MINOR: Fix typos in JaasContext
URL: https://github.com/apache/kafka/pull/13377


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703118#comment-17703118
 ] 

Atul Jain commented on KAFKA-14597:
---

Hi [~cadonna] , 
{quote}Could you run your Streams application and ensure that {{process-total}} 
is 1 or greater when you look at {{{}record-e2e-latency-max{}}}?

Please, let us know whether the value of the metric makes more sense then.
{quote}
In order to report this issue, I started the Streams app and captured the 
screenshot immediately after starting the streams application.
Adding a new screenshot with process-total more than 0 to better represent the 
issue. 

!image-2023-03-21-15-07-24-352.png!

 

The metric values do not make sense because they are not accounting for the 
sleep time added in a processor

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-06-47-836.png, 
> image-2023-03-21-15-07-24-352.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Jain updated KAFKA-14597:
--
Attachment: (was: image-2023-03-21-15-06-47-836.png)

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> process-latency-max.jpg, record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Jain updated KAFKA-14597:
--
Attachment: image-2023-03-21-15-07-24-352.png

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-06-47-836.png, 
> image-2023-03-21-15-07-24-352.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-03-21 Thread Atul Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Jain updated KAFKA-14597:
--
Attachment: image-2023-03-21-15-06-47-836.png

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-06-47-836.png, 
> process-latency-max.jpg, record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi closed pull request #13412: MINOR: Fix low bound in AdminClientConfig and DistributedConfig

2023-03-21 Thread via GitHub


hudeqi closed pull request #13412: MINOR: Fix low bound in AdminClientConfig 
and DistributedConfig
URL: https://github.com/apache/kafka/pull/13412


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13422: MINOR: Cleanups in clients common.config

2023-03-21 Thread via GitHub


mimaison merged PR #13422:
URL: https://github.com/apache/kafka/pull/13422


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14812) ProducerPerformance still counting successful sending in console when sending failed

2023-03-21 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-14812:
--

Assignee: hudeqi

> ProducerPerformance still counting successful sending in console when sending 
> failed
> 
>
> Key: KAFKA-14812
> URL: https://issues.apache.org/jira/browse/KAFKA-14812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>  Labels: kafka-producer-perf-test, tools
> Attachments: WechatIMG27.jpeg
>
>
> When using ProducerPerformance, I found that when the sending fails, it is 
> still counted as successfully sent by stat and the metrics are printed in 
> console. For example, when there is no write permission and cannot be written 
> in, the sending success rate is still magically displayed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14814) Skip restart of connectors when redundant resume request is made

2023-03-21 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703107#comment-17703107
 ] 

Yash Mayya commented on KAFKA-14814:


Hi [~cmukka20], I've re-assigned the ticket to you! Feel free to ping me for 
review whenever it's ready.

> Skip restart of connectors when redundant resume request is made
> 
>
> Key: KAFKA-14814
> URL: https://issues.apache.org/jira/browse/KAFKA-14814
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chaitanya Mukka
>Priority: Minor
>
> Consecutive requests to the {{PUT /connectors//resume}} endpoint will 
> cause the Connector to be restarted. This is a little wasteful and conflicts 
> with the idempotent nature of that endpoint. We can tweak the 
> {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not 
> invoke the {{onConnectorTargetStateChange}} method of their 
> {{ConfigUpdateListener}} instance if they pick up a new target state that 
> matches the current target state of the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 merged pull request #13404: KAFKA-14812: ProducerPerformance still counting successful sending in …

2023-03-21 Thread via GitHub


chia7712 merged PR #13404:
URL: https://github.com/apache/kafka/pull/13404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14814) Skip restart of connectors when redundant resume request is made

2023-03-21 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14814:
--

Assignee: Chaitanya Mukka  (was: Yash Mayya)

> Skip restart of connectors when redundant resume request is made
> 
>
> Key: KAFKA-14814
> URL: https://issues.apache.org/jira/browse/KAFKA-14814
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chaitanya Mukka
>Priority: Minor
>
> Consecutive requests to the {{PUT /connectors//resume}} endpoint will 
> cause the Connector to be restarted. This is a little wasteful and conflicts 
> with the idempotent nature of that endpoint. We can tweak the 
> {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not 
> invoke the {{onConnectorTargetStateChange}} method of their 
> {{ConfigUpdateListener}} instance if they pick up a new target state that 
> matches the current target state of the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 commented on pull request #13404: KAFKA-14812: ProducerPerformance still counting successful sending in …

2023-03-21 Thread via GitHub


chia7712 commented on PR #13404:
URL: https://github.com/apache/kafka/pull/13404#issuecomment-1477457628

   ```
   [2023-03-17T04:47:45.614Z] BUILD SUCCESSFUL in 1h 56m 35s
   [2023-03-17T04:47:45.614Z] 224 actionable tasks: 120 executed, 104 up-to-date
   ```
   tests pass. will merge it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-6891) send.buffer.bytes should be allowed to set -1 in KafkaConnect

2023-03-21 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-6891.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> send.buffer.bytes should be allowed to set -1 in KafkaConnect
> -
>
> Key: KAFKA-6891
> URL: https://issues.apache.org/jira/browse/KAFKA-6891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Oleg Kuznetsov
>Assignee: Zheng-Xian Li
>Priority: Major
> Fix For: 3.5.0
>
>
> *send.buffer.bytes* and *receive.buffer.bytes* are declared with *atLeast(0)* 
> constraint in *DistributedConfig*, whereas *-1* should be also allowed to set



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 commented on pull request #13398: KAFKA-6891: send.buffer.bytes should be allowed to set -1 in KafkaConnect

2023-03-21 Thread via GitHub


chia7712 commented on PR #13398:
URL: https://github.com/apache/kafka/pull/13398#issuecomment-1477456249

   @garyparrot thanks for this contribution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #13398: KAFKA-6891: send.buffer.bytes should be allowed to set -1 in KafkaConnect

2023-03-21 Thread via GitHub


chia7712 merged PR #13398:
URL: https://github.com/apache/kafka/pull/13398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13398: KAFKA-6891: send.buffer.bytes should be allowed to set -1 in KafkaConnect

2023-03-21 Thread via GitHub


chia7712 commented on PR #13398:
URL: https://github.com/apache/kafka/pull/13398#issuecomment-1477443749

   ```
   [2023-03-20T05:31:14.475Z] BUILD SUCCESSFUL in 1h 44m 21s
   [2023-03-20T05:31:14.475Z] 224 actionable tasks: 120 executed, 104 up-to-date
   ```
   
   the tests pass. will merge it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14814) Skip restart of connectors when redundant resume request is made

2023-03-21 Thread Chaitanya Mukka (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703073#comment-17703073
 ] 

Chaitanya Mukka commented on KAFKA-14814:
-

Hi [~yash.mayya]! If you haven't started on this ticket would it be ok if I can 
pick it up? This seems like a good starter ticket. I have a quick draft up here 
- [https://github.com/apache/kafka/pull/13426]

> Skip restart of connectors when redundant resume request is made
> 
>
> Key: KAFKA-14814
> URL: https://issues.apache.org/jira/browse/KAFKA-14814
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Minor
>
> Consecutive requests to the {{PUT /connectors//resume}} endpoint will 
> cause the Connector to be restarted. This is a little wasteful and conflicts 
> with the idempotent nature of that endpoint. We can tweak the 
> {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not 
> invoke the {{onConnectorTargetStateChange}} method of their 
> {{ConfigUpdateListener}} instance if they pick up a new target state that 
> matches the current target state of the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >