[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when

2024-05-08 Thread Johnson Okorie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnson Okorie updated KAFKA-16692:
---
Affects Version/s: 3.6.1
   (was: 3.6.2)

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.2. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.2 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|#L269]].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, this seems to be these requests are still sent to other brokers in 
> our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping _latestUsableVersion_ check as 
> expected. I am wondering if it is possible that because 
> _discoverBrokerVersions_ is set to false for the network client of the 
> AddPartitionsToTxnManager, it skips fetching ApiVersions? I can see here that 
> we create the network client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> This _NetworkUtils.buildNetworkClient_ seems to create a network client that 
> has _discoverBrokerVersions_ set to false. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when

2024-05-08 Thread Johnson Okorie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnson Okorie updated KAFKA-16692:
---
Description: 
We have a kafka cluster running on version 3.5.2 that we are upgrading to 
3.6.2. This cluster has a lot of clients with exactly one semantics enabled and 
hence creating transactions. As we replaced brokers with the new binaries, we 
observed lots of clients in the cluster experiencing the following error:
{code:java}
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
transactionalId=] Got error produce response with correlation 
id 6402937 on topic-partition , retrying (2147483512 attempts 
left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before 
a response was received.{code}
On inspecting the broker, we saw the following errors on brokers still running 
Kafka version 3.5.2:

 
{code:java}
message:     
Closing socket for  because of error
exception_exception_class:    
org.apache.kafka.common.errors.InvalidRequestException
exception_exception_message:    
Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
enabled
exception_stacktrace:    
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
{code}
On the new brokers running 3.6.2 we saw the following errors:

 
{code:java}
[AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
node 1043 with a network exception.{code}
 

I can also see this :
{code:java}
[AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being 
disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, 
request timeout: 3ms){code}
We started investigating this issue and digging through the changes in 3.6, we 
came across some changes introduced as part of KAFKA-14402 that we thought 
might lead to this behaviour. 

First we could see that _transaction.partition.verification.enable_ is enabled 
by default and enables a new code path that culminates in we sending version 4 
ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
[here|#L269]].

>From a 
>[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
>on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
>possible as the following code paths should prevent version 4 
>ADD_PARTITIONS_TO_TXN requests being sent to other brokers:

[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
 
[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]

However, this seems to be these requests are still sent to other brokers in our 
environment.

On further inspection of the code, I am wondering if the following code path 
could lead to this issue:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]

In this scenario, we don't have any _NodeApiVersions_ available for the 
specified nodeId and potentially skipping _latestUsableVersion_ check as 
expected. I am wondering if it is possible that because 
_discoverBrokerVersions_ is set to false for the network client of the 
AddPartitionsToTxnManager, it skips fetching ApiVersions? I can see here that 
we create the network client here:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]

This _NetworkUtils.buildNetworkClient_ seems to create a network client that 
has _discoverBrokerVersions_ set to false. 

I was hoping I could get some assistance debugging this issue. Happy to provide 
any additional information needed.

 

 

 

  was:
We have a kafka cluster running on version 3.5.2 that we are upgrading to 
3.6.2. This cluster has a lot of clients with exactly one semantics enabled and 
hence creating transactions. As we replaced brokers with the new binaries, we 
observed lots of clients in the cluster experiencing the following error:


{code:java}
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
transactionalId=] Got error produce response with correlation 
id 6402937 on topic-partition , retrying (2147483512 attempts 
left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before 
a response was received.{code}
On inspecting the broker, we saw the following errors on brokers still running 
Kafka version 3.5.2:

 
{code:java}
message:     
Closing socket for  because of error
exception_exception_class:    
org.apache.kafka.common.errors.InvalidRequestException
exception_exception_message:    
Received request api key ADD_PARTITIONS_TO_TXN with version 

Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]

2024-05-08 Thread via GitHub


gaurav-narula commented on PR #15886:
URL: https://github.com/apache/kafka/pull/15886#issuecomment-2100220810

   > Before merging this PR, I want to know which `timerTask` gets hanging when 
shutdown. Do you have any idea?
   
   I don't unfortunately but I see you're trying to investigate that in #15891. 
I'll keep this one open meanwhile.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when

2024-05-08 Thread Johnson Okorie (Jira)
Johnson Okorie created KAFKA-16692:
--

 Summary: InvalidRequestException: ADD_PARTITIONS_TO_TXN with 
version 4 which is not enabled when 
 Key: KAFKA-16692
 URL: https://issues.apache.org/jira/browse/KAFKA-16692
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.6.2
Reporter: Johnson Okorie


We have a kafka cluster running on version 3.5.2 that we are upgrading to 
3.6.2. This cluster has a lot of clients with exactly one semantics enabled and 
hence creating transactions. As we replaced brokers with the new binaries, we 
observed lots of clients in the cluster experiencing the following error:


{code:java}
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
transactionalId=] Got error produce response with correlation 
id 6402937 on topic-partition , retrying (2147483512 attempts 
left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before 
a response was received.{code}
On inspecting the broker, we saw the following errors on brokers still running 
Kafka version 3.5.2:

 
{code:java}
message:     
Closing socket for  because of error
exception_exception_class:    
org.apache.kafka.common.errors.InvalidRequestException
exception_exception_message:    
Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
enabled
exception_stacktrace:    
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
{code}
On the new brokers running 3.6.2 we saw the following errors:

 
{code:java}
[AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
node 1043 with a network exception.{code}
 

I can also see this :
{code:java}
[AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being 
disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, 
request timeout: 3ms){code}

We started investigating this issue and digging through the changes in 3.6, we 
came across some changes introduced as part of 
[KAFKA-14402|https://issues.apache.org/jira/browse/KAFKA-14402] that we thought 
might lead to this behaviour. 

First we could see that _transaction.partition.verification.enable_ is enabled 
by default and enables a new code path that culminates in we sending version 4 
ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
[here|[https://github.com/apache/kafka/blob/cb35ddc5ca233d5cca6f51c1c41b952a7e9fe1a0/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]].

>From a 
>[discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
>on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
>possible as the following code paths should prevent version 4 
>ADD_PARTITIONS_TO_TXN requests being sent to other brokers:

[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
 
[https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]

However, this seems to be these requests are still sent to other brokers in our 
environment. 

On further inspection of the code, I am wondering if the following code path 
could lead to this issue:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]

In this scenario, we don't have any _NodeApiVersions_ available for the 
specified nodeId and potentially skipping _latestUsableVersion_ check as 
expected. I am wondering if it is possible that because 
_discoverBrokerVersions_ is set to false for the network client of the 
AddPartitionsToTxnManager, it skips fetching ApiVersions? I can see here that 
we create the network client here:

[https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]

This _NetworkUtils.buildNetworkClient_ seems to create a network client that 
has _discoverBrokerVersions_ set to false. 

I was hoping I could get some assistance debugging this issue.










 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]

2024-05-08 Thread via GitHub


chia7712 commented on PR #15890:
URL: https://github.com/apache/kafka/pull/15890#issuecomment-2100176348

   @gongxuanzhang thanks for this patch. Could you take a look at 
https://issues.apache.org/jira/browse/KAFKA-10787? Personally, we should 
introduce the new checkstyle file. And then we apply it by module to reduce the 
patch size.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-08 Thread via GitHub


lucasbru commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1593738288


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -544,69 +585,78 @@ private void handleReassignedActiveTask(final Task task,
 }
 
 private void handleTasksInStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate) {
+   final Map> standbyTasksToCreate,
+   final Map> tasksToRecycle,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater,
+   final Map 
failedTasks) {
+final Map> newInputPartitions = new 
HashMap<>();
+final Map> standbyInputPartitions = new 
HashMap<>();
+final Map> activeInputPartitions = new 
HashMap<>();
+final Map> 
futuresForUpdatingInputPartitions = new LinkedHashMap<>();
+final Map> 
futuresForActiveTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForStandbyTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForTasksToClose = new LinkedHashMap<>();
 for (final Task task : stateUpdater.getTasks()) {
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-final Set inputPartitions = 
activeTasksToCreate.get(taskId);
-if (task.isActive() && 
!task.inputPartitions().equals(inputPartitions)) {
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, 
inputPartitions);
-} else {
-tasks.addPendingTaskToUpdateInputPartitions(taskId, 
inputPartitions);
-stateUpdater.remove(taskId);
-}
-} else if (task.isActive()) {
-if (tasks.removePendingActiveTaskToSuspend(taskId)) {
-log.info(
-"We were planning on suspending a task {} because 
it was revoked " +
-"The task got reassigned to this thread, so we 
cancel suspending " +
-"of the task, but add it back to the state 
updater, since we do not know " +
-"if it is fully restored yet.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-log.info(
-"We were planning on closing task {} because we 
lost one of its partitions." +
-"The task got reassigned to this thread, so cancel 
closing  of the task, but add it back to the " +
-"state updater, since we may have to catch up on 
the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
+if (task.isActive()) {
+if 
(!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) {
+final 
CompletableFuture future = 
stateUpdater.removeWithFuture(taskId);
+futuresForUpdatingInputPartitions.put(taskId, future);
+newInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 } else {
-removeTaskToRecycleFromStateUpdater(taskId, 
inputPartitions);
+final CompletableFuture 
future = stateUpdater.removeWithFuture(taskId);
+futuresForStandbyTasksToRecycle.put(taskId, future);
+activeInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
 if (task.isActive()) {
-removeTaskToRecycleFromStateUpdater(taskId, 
standbyTasksToCreate.get(taskId));
-} else {
-if (tasks.removePendingTaskToRecycle(taskId) != null) {
-log.info(
-"We were planning on recycling standby task {} to 
an active task." +
-"The task got reassigned to this thread as a 
standby task, so cancel recycling of the task, " +
-"but add it back to the state updater, since 
we may have to catch up on the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
+final 

Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]

2024-05-08 Thread via GitHub


chia7712 commented on code in PR #15868:
URL: https://github.com/apache/kafka/pull/15868#discussion_r1593737692


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java:
##
@@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws 
Throwable {
 // Try again, but with an invalid signature
 log.info(
 "Making a POST request to the {} endpoint with no connector 
started and an invalid signature header; "
-+ "expecting 403 error response",
++ "expecting 503 error response",
 connectorTasksEndpoint
 );
 assertEquals(
-FORBIDDEN.getStatusCode(),
+SERVICE_UNAVAILABLE.getStatusCode(),

Review Comment:
   `503` is a temporary error, because the worker is still in starting. Could 
you please add a wait to check the 503 first ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-05-08 Thread xuanzhang gong (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844598#comment-17844598
 ] 

xuanzhang gong commented on KAFKA-16643:


I have finished this modification task

link:https://github.com/apache/kafka/pull/15890

> Add ModifierOrder checkstyle rule
> -
>
> Key: KAFKA-16643
> URL: https://issues.apache.org/jira/browse/KAFKA-16643
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Greg Harris
>Priority: Minor
>
> Checkstyle offers the ModifierOrder rule: 
> [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
> Kafka violates in a lot of places. We should decide if this is a checkstyle 
> rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-05-08 Thread via GitHub


sidyag commented on code in PR #15837:
URL: https://github.com/apache/kafka/pull/15837#discussion_r1593693523


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3037,14 +3062,71 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
+  def 
shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion_allowedWithAlterCluster():
 Unit = {

Review Comment:
   Made the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-08 Thread via GitHub


chia7712 commented on code in PR #15873:
URL: https://github.com/apache/kafka/pull/15873#discussion_r1593681340


##
core/src/test/java/kafka/admin/ConfigCommandUnitTest.java:
##
@@ -410,6 +448,432 @@ public void testOptionEntityTypeNames() {
 doTestOptionEntityTypeNames(false);
 }
 
+@Test
+public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfUnrecognisedEntityType() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void 
shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlags() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHostUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfUnresolvableHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "RFC2606.invalid", "--entity-type", "ips", 

Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-08 Thread via GitHub


clolov commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1593670905


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -717,7 +717,7 @@ public void testUnregisterBroker() throws Throwable {
 setBrokerId(0).
 setClusterId(active.clusterId()).
 
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV0)).
+setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV1)).

Review Comment:
   Hopefully this ought to be addressed in the next commit as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-08 Thread via GitHub


clolov commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1593668135


##
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java:
##
@@ -283,7 +283,7 @@ private static Stream 
metadataVersionsForTestPartitionRegistration()
 return Stream.of(
 MetadataVersion.IBP_3_7_IV1,
 MetadataVersion.IBP_3_7_IV2,
-MetadataVersion.IBP_3_8_IV0
+MetadataVersion.IBP_3_8_IV1

Review Comment:
   My miss, updated in the next commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]

2024-05-08 Thread via GitHub


showuon commented on code in PR #15884:
URL: https://github.com/apache/kafka/pull/15884#discussion_r1593636792


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -160,28 +154,26 @@ private String outputWithoutEpoch(String output) {
 int pos = output.indexOf("Epoch: ");
 return (pos > 0) ? output.substring(0, pos) : output;
 }
-}
 
-class FeatureCommandUnitTest {

Review Comment:
   ditto, adding comments to make it clear the following tests are unit tests.



##
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java:
##
@@ -112,12 +107,7 @@ private static void executeAndAssertOutput(String json, 
String expOut, Admin adm
 });
 assertTrue(output.contains(expOut));
 }
-}
 
-/**
- * Unit test of {@link DeleteRecordsCommand} tool.
- */

Review Comment:
   I would keep the comment to make it clear the following tests are unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]

2024-05-08 Thread via GitHub


cadonna opened a new pull request, #15894:
URL: https://github.com/apache/kafka/pull/15894

   Uses the new remove operation of the state updater that returns
   a future to shutdown the task manager.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-08 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844579#comment-17844579
 ] 

FTR edited comment on KAFKA-16687 at 5/8/24 8:09 AM:
-

Update:
Testing longer for 3.6.0, the resident memory of Java App proccess wasn't more 
than the max Heap size. But as 3.7.0, total reserved and committed Native 
Memory was more than OS memory, also  it's caused by [NMT Internal] memory. 
Again, in the end, after free and buffer/cache memory was exhausted, kswapd0 
process was working to swap.
JDK version: OpenJDK 1.8.0_342


was (Author: JIRAUSER305339):
Update:
Testing longer for 3.6.0, the resident memory of Java App proccess wasn't more 
than the max Heap size. But as 3.7.0, total reserved and committed Native 
Memory was more than OS memory, also  it's caused by [NMT Internal] memory. 
Again, in the end, after free and buffer/cache memory was exhausted, kswapd0 
process was working to swap.

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using

2024-05-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16640.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Replace TestUtils#resource by scala.util.Using
> --
>
> Key: KAFKA-16640
> URL: https://issues.apache.org/jira/browse/KAFKA-16640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.8.0
>
>
> `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we 
> don't need to have custom try-resource function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16640: Replace TestUtils#resource by scala.util.Using [kafka]

2024-05-08 Thread via GitHub


chia7712 merged PR #15881:
URL: https://github.com/apache/kafka/pull/15881


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]

2024-05-08 Thread via GitHub


cadonna merged PR #15871:
URL: https://github.com/apache/kafka/pull/15871


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]

2024-05-08 Thread via GitHub


rishiraj88 commented on code in PR #15871:
URL: https://github.com/apache/kafka/pull/15871#discussion_r1593553420


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -623,6 +623,21 @@ private void addToTasksToClose(final Map> futures,

Review Comment:
   OK then. Let's continue to have it as a method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-08 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844579#comment-17844579
 ] 

FTR commented on KAFKA-16687:
-

Update:
Testing longer for 3.6.0, the resident memory of Java App proccess wasn't more 
than the max Heap size. But as 3.7.0, total reserved and committed Native 
Memory was more than OS memory, also  it's caused by [NMT Internal] memory. 
Again, in the end, after free and buffer/cache memory was exhausted, kswapd0 
process was working to swap.

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]

2024-05-08 Thread via GitHub


lucasbru merged PR #15874:
URL: https://github.com/apache/kafka/pull/15874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]

2024-05-08 Thread via GitHub


lucasbru commented on PR #15874:
URL: https://github.com/apache/kafka/pull/15874#issuecomment-2099949443

   
https://confluent-kafka-branch-builder-system-test-results-v2.s3-us-west-2.amazonaws.com/trunk/2024-05-07--001./report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16511) Leaking tiered segments

2024-05-08 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16511:
--
Fix Version/s: 3.7.1

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.8.0, 3.7.1
>
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> 

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593525765


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   > While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the fetchHighWatermarkMetadata 
(or) convertToOffsetMetadataOrThrow so the call will succeed even when 
current-log-start-offset > old-HWM.
   
   For 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 I think it's safe we didn't throw exception when `current-log-start-offset > 
old-HWM` because it will be called when initialization or 
`maybeIncrementLogStartOffset`. In the latter, we've checked `newLogStartOffset 
> logStartOffset` already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-08 Thread via GitHub


showuon merged PR #15817:
URL: https://github.com/apache/kafka/pull/15817


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-08 Thread via GitHub


showuon commented on PR #15817:
URL: https://github.com/apache/kafka/pull/15817#issuecomment-2099912904

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593511224


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Ah, OK. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-08 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1593500243


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords(
 // reset to MAX_VALUE in case the store is empty
 sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
+try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
-boolean outerJoinLeftWindowOpen = false;
-boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
-// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+final KeyValue, 
LeftOrRightValue> nextKeyValue = it.next();
+final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = nextKeyValue.key;
+sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.getTimestamp();
+if 
(isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && 
isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) {

Review Comment:
   Yeah, you are right! That's a good catch. I was depending too much on the 
tests. I couldn't find any test that triggered the condition even after 
reverting the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
current-log-start-offset > old-HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
log-start-offset > old-HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-08 Thread Lenin Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844529#comment-17844529
 ] 

Lenin Joseph edited comment on KAFKA-16656 at 5/8/24 6:47 AM:
--

Hi [~ChrisEgerton] , thanks for looking into it.
We see cyclic replication for all the topics including internal topics such as 
mm2-offset-syncs, checkpoint, mirrormaker2-cluster-status,  
mirrormaker2-cluster-configs, and mirrormaker2-cluster-offsets. But not for 
consumer-offsets topic.

Below are the configs that we have used.
topicsPattern: ".*"

replication.policy.separator: "-"

replication.policy.class: 
org.apache.kafka.connect.mirror.DefaultReplicationPolicy


was (Author: JIRAUSER304387):
Hi [~ChrisEgerton] , thanks for looking into it.
We see cyclic replication for all the topics including internal topics such as 
mm2-offset-syncs, checkpoint, mirrormaker2-cluster-status,  
mirrormaker2-cluster-configs, and mirrormaker2-cluster-offsets. But not for 
consumer-offsets topic.

Below are the configs that we have used.
topicsPattern: ".*"

replication.policy.separator: "-"

replication.policy.class: 
org.apache.kafka.connect.mirror.IdentityReplicationPolicy

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
log-start-offset > HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]

2024-05-08 Thread via GitHub


KevinZTW commented on code in PR #15884:
URL: https://github.com/apache/kafka/pull/15884#discussion_r1593470507


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -160,28 +154,26 @@ private String outputWithoutEpoch(String output) {
 int pos = output.indexOf("Epoch: ");
 return (pos > 0) ? output.substring(0, pos) : output;
 }
-}
 
-class FeatureCommandUnitTest {
 @Test
 public void testLevelToString() {
 assertEquals("5", FeatureCommand.levelToString("foo.bar", (short) 5));
 assertEquals("3.3-IV0",
-FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV0.featureLevel()));
+FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV0.featureLevel()));
 }
 
 @Test
 public void testMetadataVersionsToString() {
 assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3",
-
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, 
MetadataVersion.IBP_3_3_IV3));
+
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, 
MetadataVersion.IBP_3_3_IV3));
 }
 
 @Test
 public void testdowngradeType() {

Review Comment:
   thanks! just fix it in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-08 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2099844734

   > Thanks for the PR. One question: So when we temporarily set high-watermark 
as ` LogOffsetMetadata(0)` for the leader, we're waiting for the high-watermark 
gets updated after followers fetch from the leader, right?
   
   yes, the call to 
[maybeIncrementLeaderHW](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L1166)
 will succeed when the node becomes leader for the partition. Note that if the 
current node is the only alive replica, then the high-watermark gets updated to 
the leader-log-end-offset. The behavior is same for both normal and 
remote-storage enabled topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-08 Thread via GitHub


jeqo opened a new pull request, #15893:
URL: https://github.com/apache/kafka/pull/15893

   [KAFKA-16691]
   
   ---
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2