[GitHub] [kafka] ex172000 commented on a diff in pull request #14110: MINOR: Add test for describe topic with ID

2023-08-08 Thread via GitHub


ex172000 commented on code in PR #14110:
URL: https://github.com/apache/kafka/pull/14110#discussion_r1288017392


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2354,6 +2354,75 @@ public void testDeleteRecords() throws Exception {
 }
 }
 
+@Test
+public void testDescribeTopicsByIds() {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Valid ID
+Uuid topicId = Uuid.randomUuid();
+String topicName = "test-topic";
+Node leader = env.cluster().nodes().get(0);
+MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+new TopicPartition(topicName, 0),
+Optional.of(leader.id()),
+Optional.of(10),
+singletonList(leader.id()),
+singletonList(leader.id()),
+singletonList(leader.id()));
+env.kafkaClient().prepareResponse(RequestTestUtils
+.metadataResponse(
+env.cluster().nodes(),
+env.cluster().clusterResource().clusterId(),
+env.cluster().controller().id(),
+singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false,
+singletonList(partitionMetadata), 
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED;
+TopicCollection.TopicIdCollection topicIds = 
TopicCollection.ofTopicIds(
+singletonList(topicId));
+try {
+DescribeTopicsResult result = 
env.adminClient().describeTopics(topicIds);
+Map allTopicIds = 
result.allTopicIds().get();
+assertEquals(topicName, allTopicIds.get(topicId).name());
+} catch (Exception e) {
+fail("describe with valid topicId should not fail", e);
+}
+
+// ID not exist in brokers
+Uuid nonExistID = Uuid.randomUuid();
+env.kafkaClient().prepareResponse(RequestTestUtils
+.metadataResponse(
+env.cluster().nodes(),
+env.cluster().clusterResource().clusterId(),
+env.cluster().controller().id(),
+asList()));
+try {
+DescribeTopicsResult result = env.adminClient().describeTopics(
+TopicCollection.ofTopicIds(singletonList(nonExistID)));
+TestUtils.assertFutureError(result.allTopicIds(), 
InvalidTopicException.class);
+result.allTopicIds().get();
+fail("describe with non-exist topic ID should throw 
exception");
+} catch (Exception e) {
+assertEquals(
+
String.format("org.apache.kafka.common.errors.InvalidTopicException: TopicId %s 
not found.", nonExistID),

Review Comment:
   Thanks for pointing out, I can make a follow up change looking into this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-08 Thread via GitHub


fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1287993847


##
core/src/main/scala/kafka/tools/GetOffsetShell.scala:
##


Review Comment:
   > We should still keep this file can forward all args to new GetOffsetShell, 
see `FeatureCommand.scala`.
   
   Hi @dengziming, this is tool was reported in the "missing wrapper script" 
category in 
[KIP-906](https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines),
 but that was only because some system tests are depending on the FQCN. In this 
PR we are also changing that, and they are now using the wrapper script 
instead, so I think there is no need for the redirection.
   



##
core/src/main/scala/kafka/tools/GetOffsetShell.scala:
##


Review Comment:
   Hi @dengziming, this is tool was reported in the "missing wrapper script" 
category in 
[KIP-906](https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines),
 but that was only because some system tests are depending on the FQCN. In this 
PR we are also changing that, and they are now using the wrapper script 
instead, so I think there is no need for the redirection.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15324) Do not bump the leader epoch when changing the replica set

2023-08-08 Thread zeyuliu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zeyuliu updated KAFKA-15324:

Description: 
The KRaft controller increases the leader epoch when a partition replica set 
shrink. This is not strictly required and should be removed.
Related changes: https://github.com/apache/kafka/pull/13765/files

  was:The KRaft controller increases the leader epoch when a partition replica 
set shrink. This is not strictly required and should be removed.


> Do not bump the leader epoch when changing the replica set
> --
>
> Key: KAFKA-15324
> URL: https://issues.apache.org/jira/browse/KAFKA-15324
> Project: Kafka
>  Issue Type: Task
>Reporter: zeyuliu
>Priority: Major
>
> The KRaft controller increases the leader epoch when a partition replica set 
> shrink. This is not strictly required and should be removed.
> Related changes: https://github.com/apache/kafka/pull/13765/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15324) Do not bump the leader epoch when changing the replica set

2023-08-08 Thread zeyuliu (Jira)
zeyuliu created KAFKA-15324:
---

 Summary: Do not bump the leader epoch when changing the replica set
 Key: KAFKA-15324
 URL: https://issues.apache.org/jira/browse/KAFKA-15324
 Project: Kafka
  Issue Type: Task
Reporter: zeyuliu


The KRaft controller increases the leader epoch when a partition replica set 
shrink. This is not strictly required and should be removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe closed pull request #12662: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2023-08-08 Thread via GitHub


cmccabe closed pull request #12662: KAFKA-14214: Convert StandardAuthorizer to 
copy-on-write
URL: https://github.com/apache/kafka/pull/12662


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12662: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2023-08-08 Thread via GitHub


cmccabe commented on PR #12662:
URL: https://github.com/apache/kafka/pull/12662#issuecomment-1670700174

   > @cmccabe I guess this can be closed now since it is superseded by 
https://github.com/apache/kafka/pull/13437 ?
   
   yes. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #12531: Add new metadata loader and publisher [WIP]

2023-08-08 Thread via GitHub


cmccabe closed pull request #12531: Add new metadata loader and publisher [WIP]
URL: https://github.com/apache/kafka/pull/12531


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12531: Add new metadata loader and publisher [WIP]

2023-08-08 Thread via GitHub


cmccabe commented on PR #12531:
URL: https://github.com/apache/kafka/pull/12531#issuecomment-1670699324

   This is very stale at this point. Basically it was replaced by #12983, 
#13337 and some other changes in that vein


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-08-08 Thread via GitHub


github-actions[bot] commented on PR #13656:
URL: https://github.com/apache/kafka/pull/13656#issuecomment-1670601383

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-08 Thread via GitHub


dengziming commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1287886679


##
core/src/main/scala/kafka/tools/GetOffsetShell.scala:
##


Review Comment:
   We should still keep this file can forward all args to new GetOffsetShell, 
see `FeatureCommand.scala`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #14110: MINOR: Add test for describe topic with ID

2023-08-08 Thread via GitHub


dengziming commented on code in PR #14110:
URL: https://github.com/apache/kafka/pull/14110#discussion_r1287875933


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2354,6 +2354,75 @@ public void testDeleteRecords() throws Exception {
 }
 }
 
+@Test
+public void testDescribeTopicsByIds() {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Valid ID
+Uuid topicId = Uuid.randomUuid();
+String topicName = "test-topic";
+Node leader = env.cluster().nodes().get(0);
+MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+new TopicPartition(topicName, 0),
+Optional.of(leader.id()),
+Optional.of(10),
+singletonList(leader.id()),
+singletonList(leader.id()),
+singletonList(leader.id()));
+env.kafkaClient().prepareResponse(RequestTestUtils
+.metadataResponse(
+env.cluster().nodes(),
+env.cluster().clusterResource().clusterId(),
+env.cluster().controller().id(),
+singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false,
+singletonList(partitionMetadata), 
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED;
+TopicCollection.TopicIdCollection topicIds = 
TopicCollection.ofTopicIds(
+singletonList(topicId));
+try {
+DescribeTopicsResult result = 
env.adminClient().describeTopics(topicIds);
+Map allTopicIds = 
result.allTopicIds().get();
+assertEquals(topicName, allTopicIds.get(topicId).name());
+} catch (Exception e) {
+fail("describe with valid topicId should not fail", e);
+}
+
+// ID not exist in brokers
+Uuid nonExistID = Uuid.randomUuid();
+env.kafkaClient().prepareResponse(RequestTestUtils
+.metadataResponse(
+env.cluster().nodes(),
+env.cluster().clusterResource().clusterId(),
+env.cluster().controller().id(),
+asList()));

Review Comment:
   It's better to use emptyList()



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2354,6 +2354,75 @@ public void testDeleteRecords() throws Exception {
 }
 }
 
+@Test
+public void testDescribeTopicsByIds() {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Valid ID
+Uuid topicId = Uuid.randomUuid();
+String topicName = "test-topic";
+Node leader = env.cluster().nodes().get(0);
+MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+new TopicPartition(topicName, 0),
+Optional.of(leader.id()),
+Optional.of(10),
+singletonList(leader.id()),
+singletonList(leader.id()),
+singletonList(leader.id()));
+env.kafkaClient().prepareResponse(RequestTestUtils
+.metadataResponse(
+env.cluster().nodes(),
+env.cluster().clusterResource().clusterId(),
+env.cluster().controller().id(),
+singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false,
+singletonList(partitionMetadata), 
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED;
+TopicCollection.TopicIdCollection topicIds = 
TopicCollection.ofTopicIds(
+singletonList(topicId));
+try {
+DescribeTopicsResult result = 
env.adminClient().describeTopics(topicIds);
+Map allTopicIds = 
result.allTopicIds().get();
+assertEquals(topicName, allTopicIds.get(topicId).name());
+} catch (Exception e) {
+fail("describe with valid topicId should not fail", e);
+}
+
+// ID not exist in brokers
+Uuid nonExistID = Uuid.randomUuid();
+env.kafkaClient().prepareResponse(RequestTestUtils
+.metadataResponse(
+env.cluster().nodes(),
+env.cluster().clusterResource().clusterId

[jira] [Assigned] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-08-08 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi reassigned KAFKA-14912:
--

Assignee: hudeqi

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: hudeqi
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-08-08 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752229#comment-17752229
 ] 

hudeqi commented on KAFKA-14912:


ok, i'll take a look in the last few days, thx

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-08 Thread Maruthi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maruthi updated KAFKA-15319:

Description: 
Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12

Upgrade to 1.2.13 to fix 

https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c

  was:
Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12

Upgrade to 1.2.13 to fix 


> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Major
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #14141: KAFKA-15100; KRaft data race with the expiration service

2023-08-08 Thread via GitHub


jsancio commented on code in PR #14141:
URL: https://github.com/apache/kafka/pull/14141#discussion_r1287811134


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1039,6 +1054,14 @@ private FetchResponseData tryCompleteFetchRequest(
 }
 }
 
+private static boolean 
isPartitionTruncated(FetchResponseData.PartitionData partitionResponseData) {
+FetchResponseData.EpochEndOffset divergingEpoch = 
partitionResponseData.divergingEpoch();
+FetchResponseData.SnapshotId snapshotId = 
partitionResponseData.snapshotId();
+
+return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != 
-1 ||

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest

2023-08-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15322:
--
Description: 
Relevant stack trace for {{{}AbstractCoordinatorTest{}}}:
{noformat}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{noformat}

  was:
Relevant Logs for AbstractCoordinatorTest - 

{noformat}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{noformat}


> Possible thread leaks in AbstractCoordinatorTest
> 
>
> Key: KAFKA-15322
> URL: https://issues.apache.org/jira/browse/KAFKA-15322
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Relevant stack trace for {{{}AbstractCoordinatorTest{}}}:
> {noformat}
> Relevant Logs for AbstractCoordinatorTest - 
> Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
>   java.base@17.0.7/java.lang.Object.wait(Native Method)
>   java.base@17.0.7/java.lang.Object.wait(Object.java:338)
>   
> app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15323) Possible thread leaks in ListOffsetsHandlerTest

2023-08-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15323:
--
Description: 
Relevant stack trace for {{{}ListOffsetsHandlerTest{}}}:
{noformat}
Thread: 30 - kafka-admin-client-thread | adminclient-2
app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430)
app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203)

app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83)

app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497)
java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64)
{noformat}

  was:
Relevant Logs for AbstractCoordinatorTest - 

{noformat}
Thread: 30 - kafka-admin-client-thread | adminclient-2
app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430)
app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203)

app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83)

app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497)
java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64)
{noformat}


> Possible thread leaks in ListOffsetsHandlerTest
> ---
>
> Key: KAFKA-15323
> URL: https://issues.apache.org/jira/browse/KAFKA-15323
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Relevant stack trace for {{{}ListOffsetsHandlerTest{}}}:
> {noformat}
> Thread: 30 - kafka-admin-client-thread | adminclient-2
>   app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430)
>   app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203)
>   
> app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83)
>   
> app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185)
>   
> app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233)
>   
> app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543)
>   
> app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497)
>   java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
>   app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15323) Possible thread leaks in ListOffsetsHandlerTest

2023-08-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15323:
--
Description: 
Relevant Logs for AbstractCoordinatorTest - 

{noformat}
Thread: 30 - kafka-admin-client-thread | adminclient-2
app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430)
app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203)

app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83)

app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543)

app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497)
java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64)
{noformat}

  was:
Relevant Logs for AbstractCoordinatorTest - 

{noformat}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{noformat}


> Possible thread leaks in ListOffsetsHandlerTest
> ---
>
> Key: KAFKA-15323
> URL: https://issues.apache.org/jira/browse/KAFKA-15323
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Relevant Logs for AbstractCoordinatorTest - 
> {noformat}
> Thread: 30 - kafka-admin-client-thread | adminclient-2
>   app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430)
>   app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203)
>   
> app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83)
>   
> app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185)
>   
> app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233)
>   
> app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543)
>   
> app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497)
>   java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
>   app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest

2023-08-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15322:
--
Description: 
Relevant Logs for AbstractCoordinatorTest - 

{noformat}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{noformat}

  was:
Relevant Logs for AbstractCoordinatorTest - 

{code}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{code}


> Possible thread leaks in AbstractCoordinatorTest
> 
>
> Key: KAFKA-15322
> URL: https://issues.apache.org/jira/browse/KAFKA-15322
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Relevant Logs for AbstractCoordinatorTest - 
> {noformat}
> Relevant Logs for AbstractCoordinatorTest - 
> Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
>   java.base@17.0.7/java.lang.Object.wait(Native Method)
>   java.base@17.0.7/java.lang.Object.wait(Object.java:338)
>   
> app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15323) Possible thread leaks in ListOffsetsHandlerTest

2023-08-08 Thread Kirk True (Jira)
Kirk True created KAFKA-15323:
-

 Summary: Possible thread leaks in ListOffsetsHandlerTest
 Key: KAFKA-15323
 URL: https://issues.apache.org/jira/browse/KAFKA-15323
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, unit tests
Reporter: Kirk True
Assignee: Kirk True


Relevant Logs for AbstractCoordinatorTest - 

{noformat}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest

2023-08-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15322:
--
Description: 
Relevant Logs for AbstractCoordinatorTest - 

{code}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{code}

  was:
Relevant Logs for AbstractCoordinatorTest - 

 
{code}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{code}
 

 


> Possible thread leaks in AbstractCoordinatorTest
> 
>
> Key: KAFKA-15322
> URL: https://issues.apache.org/jira/browse/KAFKA-15322
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Relevant Logs for AbstractCoordinatorTest - 
> {code}
> Relevant Logs for AbstractCoordinatorTest - 
> Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
>   java.base@17.0.7/java.lang.Object.wait(Native Method)
>   java.base@17.0.7/java.lang.Object.wait(Object.java:338)
>   
> app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest

2023-08-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15322:
--
Description: 
Relevant Logs for AbstractCoordinatorTest - 

 
{code}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338)

app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
{code}
 

 

  was:
Relevant Logs for AbstractCoordinatorTest - 

 
{code:java}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338) 
app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
 {code}
 

 


> Possible thread leaks in AbstractCoordinatorTest
> 
>
> Key: KAFKA-15322
> URL: https://issues.apache.org/jira/browse/KAFKA-15322
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Relevant Logs for AbstractCoordinatorTest - 
>  
> {code}
> Relevant Logs for AbstractCoordinatorTest - 
> Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
>   java.base@17.0.7/java.lang.Object.wait(Native Method)
>   java.base@17.0.7/java.lang.Object.wait(Object.java:338)
>   
> app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest

2023-08-08 Thread Kirk True (Jira)
Kirk True created KAFKA-15322:
-

 Summary: Possible thread leaks in AbstractCoordinatorTest
 Key: KAFKA-15322
 URL: https://issues.apache.org/jira/browse/KAFKA-15322
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, unit tests
Reporter: Kirk True
Assignee: Kirk True


Relevant Logs for AbstractCoordinatorTest - 

 
{code:java}
Relevant Logs for AbstractCoordinatorTest - 
Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group
java.base@17.0.7/java.lang.Object.wait(Native Method)
java.base@17.0.7/java.lang.Object.wait(Object.java:338) 
app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448)
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14937) Refactoring for client code to reduce boilerplate

2023-08-08 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752212#comment-17752212
 ] 

Kirk True commented on KAFKA-14937:
---

Reviewing the following PRs for flaky tests:
 * [https://github.com/apache/kafka/pull/13990]

 * [https://github.com/apache/kafka/pull/14118]

 * [https://github.com/apache/kafka/pull/14123]

 

> Refactoring for client code to reduce boilerplate
> -
>
> Key: KAFKA-14937
> URL: https://issues.apache.org/jira/browse/KAFKA-14937
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, producer 
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.6.0
>
>
> There are a number of places in the client code where the same basic calls 
> are made by more than one client implementation. Minor refactoring will 
> reduce the amount of boilerplate code necessary for the client to construct 
> its internal state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-15312.

Resolution: Fixed

> FileRawSnapshotWriter must flush before atomic move
> ---
>
> Key: KAFKA-15312
> URL: https://issues.apache.org/jira/browse/KAFKA-15312
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> On ext4 file systems it is possible for KRaft to create zero-length snapshot 
> files. Not all file system fsync to disk on close. For KRaft to guarantee 
> that the data has made it to disk before calling rename, it needs to make 
> sure that the file has been fsync.
> We have seen cases were the snapshot file has zero-length data on ext4 file 
> system.
> {quote} "Delayed allocation" means that the filesystem tries to delay the 
> allocation of physical disk blocks for written data for as long as possible. 
> This policy brings some important performance benefits. Many files are 
> short-lived; delayed allocation can keep the system from writing fleeting 
> temporary files to disk at all. And, for longer-lived files, delayed 
> allocation allows the kernel to accumulate more data and to allocate the 
> blocks for data contiguously, speeding up both the write and any subsequent 
> reads of that data. It's an important optimization which is found in most 
> contemporary filesystems.
> But, if blocks have not been allocated for a file, there is no need to write 
> them quickly as a security measure. Since the blocks do not yet exist, it is 
> not possible to read somebody else's data from them. So ext4 will not 
> (cannot) write out unallocated blocks as part of the next journal commit 
> cycle. Those blocks will, instead, wait until the kernel decides to flush 
> them out; at that point, physical blocks will be allocated on disk and the 
> data will be made persistent. The kernel doesn't like to let file data sit 
> unwritten for too long, but it can still take a minute or so (with the 
> default settings) for that data to be flushed - far longer than the five 
> seconds normally seen with ext3. And that is why a crash can cause the loss 
> of quite a bit more data when ext4 is being used. 
> {quote}
> from: [https://lwn.net/Articles/322823/]
> {quote}auto_da_alloc ( * ), noauto_da_alloc
> Many broken applications don't use fsync() when replacing existing files via 
> patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ 
> rename("foo.new", "foo"), or worse yet, fd = open("foo", 
> O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will 
> detect the replace-via-rename and replace-via-truncate patterns and force 
> that any delayed allocation blocks are allocated such that at the next 
> journal commit, in the default data=ordered mode, the data blocks of the new 
> file are forced to disk before the rename() operation is committed. This 
> provides roughly the same level of guarantees as ext3, and avoids the 
> "zero-length" problem that can happen when a system crashes before the 
> delayed allocation blocks are forced to disk.
> {quote}
> from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14937) Refactoring for client code to reduce boilerplate

2023-08-08 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752211#comment-17752211
 ] 

Kirk True commented on KAFKA-14937:
---

Having lots of issues getting a clean build. I filed INFRA-24874 to see if they 
could help.

> Refactoring for client code to reduce boilerplate
> -
>
> Key: KAFKA-14937
> URL: https://issues.apache.org/jira/browse/KAFKA-14937
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, producer 
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.6.0
>
>
> There are a number of places in the client code where the same basic calls 
> are made by more than one client implementation. Minor refactoring will 
> reduce the amount of boilerplate code necessary for the client to construct 
> its internal state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #14141: KAFKA-15100; KRaft data race with the expiration service

2023-08-08 Thread via GitHub


jsancio commented on code in PR #14141:
URL: https://github.com/apache/kafka/pull/14141#discussion_r1287805598


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -984,11 +991,16 @@ private CompletableFuture 
handleFetchRequest(
 Throwable cause = exception instanceof ExecutionException ?
 exception.getCause() : exception;
 
-// If the fetch timed out in purgatory, it means no new data 
is available,
-// and we will complete the fetch successfully. Otherwise, if 
there was
-// any other error, we need to return it.
 Errors error = Errors.forException(cause);
-if (error != Errors.REQUEST_TIMED_OUT) {
+if (error == Errors.REQUEST_TIMED_OUT) {
+// Note that for this case the calling thread is the 
expiration service thread and not the
+// polling thread.
+//
+// If the fetch request timed out in purgatory, it means 
no new data is available,
+// just return the original fetch response.
+return response;

Review Comment:
   Yes. That is what I would like to do in the long-term. In the long-term I 
plan to have a more generic event executor that would allow us to keep the 
current concurrency model, remove the expiration service thread, and keep all 
our deterministic unittest and simulation tests.
   
   It is a bigger change and I didn't want to block this fix on that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15321) Document consumer group member state machine

2023-08-08 Thread Kirk True (Jira)
Kirk True created KAFKA-15321:
-

 Summary: Document consumer group member state machine
 Key: KAFKA-15321
 URL: https://issues.apache.org/jira/browse/KAFKA-15321
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kirk True
Assignee: Kirk True


We need to first document the new consumer group member state machine. What are 
the different states and what are the transitions?

See [~pnee]'s notes: 
[https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design]

*_Don’t forget to include diagrams for clarity!_*

This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-08 Thread Maruthi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maruthi updated KAFKA-15319:

Component/s: (was: streams)

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Major
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15320) Document event queueing patterns

2023-08-08 Thread Kirk True (Jira)
Kirk True created KAFKA-15320:
-

 Summary: Document event queueing patterns
 Key: KAFKA-15320
 URL: https://issues.apache.org/jira/browse/KAFKA-15320
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kirk True
Assignee: Kirk True


We need to first document the event enqueuing patterns in the 
PrototypeAsyncConsumer. As part of this task, determine if it’s 
necessary/beneficial to _conditionally_ add events and/or coalesce any 
duplicate events in the queue.

_Don’t forget to include diagrams for clarity!_

This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15312:
---
Fix Version/s: 3.4.2

> FileRawSnapshotWriter must flush before atomic move
> ---
>
> Key: KAFKA-15312
> URL: https://issues.apache.org/jira/browse/KAFKA-15312
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> On ext4 file systems it is possible for KRaft to create zero-length snapshot 
> files. Not all file system fsync to disk on close. For KRaft to guarantee 
> that the data has made it to disk before calling rename, it needs to make 
> sure that the file has been fsync.
> We have seen cases were the snapshot file has zero-length data on ext4 file 
> system.
> {quote} "Delayed allocation" means that the filesystem tries to delay the 
> allocation of physical disk blocks for written data for as long as possible. 
> This policy brings some important performance benefits. Many files are 
> short-lived; delayed allocation can keep the system from writing fleeting 
> temporary files to disk at all. And, for longer-lived files, delayed 
> allocation allows the kernel to accumulate more data and to allocate the 
> blocks for data contiguously, speeding up both the write and any subsequent 
> reads of that data. It's an important optimization which is found in most 
> contemporary filesystems.
> But, if blocks have not been allocated for a file, there is no need to write 
> them quickly as a security measure. Since the blocks do not yet exist, it is 
> not possible to read somebody else's data from them. So ext4 will not 
> (cannot) write out unallocated blocks as part of the next journal commit 
> cycle. Those blocks will, instead, wait until the kernel decides to flush 
> them out; at that point, physical blocks will be allocated on disk and the 
> data will be made persistent. The kernel doesn't like to let file data sit 
> unwritten for too long, but it can still take a minute or so (with the 
> default settings) for that data to be flushed - far longer than the five 
> seconds normally seen with ext3. And that is why a crash can cause the loss 
> of quite a bit more data when ext4 is being used. 
> {quote}
> from: [https://lwn.net/Articles/322823/]
> {quote}auto_da_alloc ( * ), noauto_da_alloc
> Many broken applications don't use fsync() when replacing existing files via 
> patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ 
> rename("foo.new", "foo"), or worse yet, fd = open("foo", 
> O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will 
> detect the replace-via-rename and replace-via-truncate patterns and force 
> that any delayed allocation blocks are allocated such that at the next 
> journal commit, in the default data=ordered mode, the data blocks of the new 
> file are forced to disk before the rename() operation is committed. This 
> provides roughly the same level of guarantees as ext3, and avoids the 
> "zero-length" problem that can happen when a system crashes before the 
> delayed allocation blocks are forced to disk.
> {quote}
> from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15312:
---
Fix Version/s: 3.3.3

> FileRawSnapshotWriter must flush before atomic move
> ---
>
> Key: KAFKA-15312
> URL: https://issues.apache.org/jira/browse/KAFKA-15312
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> On ext4 file systems it is possible for KRaft to create zero-length snapshot 
> files. Not all file system fsync to disk on close. For KRaft to guarantee 
> that the data has made it to disk before calling rename, it needs to make 
> sure that the file has been fsync.
> We have seen cases were the snapshot file has zero-length data on ext4 file 
> system.
> {quote} "Delayed allocation" means that the filesystem tries to delay the 
> allocation of physical disk blocks for written data for as long as possible. 
> This policy brings some important performance benefits. Many files are 
> short-lived; delayed allocation can keep the system from writing fleeting 
> temporary files to disk at all. And, for longer-lived files, delayed 
> allocation allows the kernel to accumulate more data and to allocate the 
> blocks for data contiguously, speeding up both the write and any subsequent 
> reads of that data. It's an important optimization which is found in most 
> contemporary filesystems.
> But, if blocks have not been allocated for a file, there is no need to write 
> them quickly as a security measure. Since the blocks do not yet exist, it is 
> not possible to read somebody else's data from them. So ext4 will not 
> (cannot) write out unallocated blocks as part of the next journal commit 
> cycle. Those blocks will, instead, wait until the kernel decides to flush 
> them out; at that point, physical blocks will be allocated on disk and the 
> data will be made persistent. The kernel doesn't like to let file data sit 
> unwritten for too long, but it can still take a minute or so (with the 
> default settings) for that data to be flushed - far longer than the five 
> seconds normally seen with ext3. And that is why a crash can cause the loss 
> of quite a bit more data when ext4 is being used. 
> {quote}
> from: [https://lwn.net/Articles/322823/]
> {quote}auto_da_alloc ( * ), noauto_da_alloc
> Many broken applications don't use fsync() when replacing existing files via 
> patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ 
> rename("foo.new", "foo"), or worse yet, fd = open("foo", 
> O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will 
> detect the replace-via-rename and replace-via-truncate patterns and force 
> that any delayed allocation blocks are allocated such that at the next 
> journal commit, in the default data=ordered mode, the data blocks of the new 
> file are forced to disk before the rename() operation is committed. This 
> provides roughly the same level of guarantees as ext3, and avoids the 
> "zero-length" problem that can happen when a system crashes before the 
> delayed allocation blocks are forced to disk.
> {quote}
> from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-08 Thread Maruthi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maruthi updated KAFKA-15319:

Component/s: streams

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Major
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-08 Thread Maruthi (Jira)
Maruthi created KAFKA-15319:
---

 Summary: Upgrade rocksdb to fix CVE-2022-37434
 Key: KAFKA-15319
 URL: https://issues.apache.org/jira/browse/KAFKA-15319
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.4.1
Reporter: Maruthi


Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12

Upgrade to 1.2.13 to fix 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #14141: KAFKA-15100; KRaft data race with the expiration service

2023-08-08 Thread via GitHub


hachikuji commented on code in PR #14141:
URL: https://github.com/apache/kafka/pull/14141#discussion_r1287800658


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -984,11 +991,16 @@ private CompletableFuture 
handleFetchRequest(
 Throwable cause = exception instanceof ExecutionException ?
 exception.getCause() : exception;
 
-// If the fetch timed out in purgatory, it means no new data 
is available,
-// and we will complete the fetch successfully. Otherwise, if 
there was
-// any other error, we need to return it.
 Errors error = Errors.forException(cause);
-if (error != Errors.REQUEST_TIMED_OUT) {
+if (error == Errors.REQUEST_TIMED_OUT) {
+// Note that for this case the calling thread is the 
expiration service thread and not the
+// polling thread.
+//
+// If the fetch request timed out in purgatory, it means 
no new data is available,
+// just return the original fetch response.
+return response;

Review Comment:
   This is probably the simplest fix for now. An alternative might be to use 
the callback to put an event on the queue that could be handled in the event 
loop. 



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1039,6 +1054,14 @@ private FetchResponseData tryCompleteFetchRequest(
 }
 }
 
+private static boolean 
isPartitionTruncated(FetchResponseData.PartitionData partitionResponseData) {
+FetchResponseData.EpochEndOffset divergingEpoch = 
partitionResponseData.divergingEpoch();
+FetchResponseData.SnapshotId snapshotId = 
partitionResponseData.snapshotId();
+
+return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != 
-1 ||

Review Comment:
   nit: maybe it's just me, but it seems more intuitive to separate the 
diverging epoch and snapshot id checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13503: MINOR: Refactor TierStateMachine related tests into a separate test file

2023-08-08 Thread via GitHub


junrao commented on code in PR #13503:
URL: https://github.com/apache/kafka/pull/13503#discussion_r1287779224


##
core/src/test/scala/unit/kafka/server/FetcherThreadTestUtils.scala:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server

Review Comment:
   unit.kafka.server => kafka.server ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15224) Automate version change to snapshot

2023-08-08 Thread Tanay Karmarkar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752203#comment-17752203
 ] 

Tanay Karmarkar commented on KAFKA-15224:
-

[~divijvaidya] can we include new python packages? Do you know where do you 
currently manage python package requirements ? I can't seem to find the 
requirements.txt

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-08-08 Thread via GitHub


kirktrue commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1670405790

   @junrao I've rebased against `trunk` over the last week to trigger Jenkins 
runs. Each build fails with a different set of tests. Most of the failing tests 
already have Jiras filed.
   
   I do want to point out another pull request I made (#14095) in which I 
literally only changed `README.md`. The first change was to add a newline, and 
that build included several test failures. I then removed that newline, and the 
resulting build included a different set of test failures.
   
   According to the graphs on [this Grade Enterprise 
page](https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America/Los_Angeles&search.values=trunk&tests.help=advancedQuery),
 for the last 28 days Jenkins builds are failing on `trunk` are failing 28% of 
the builds and 90% of the builds include flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


lihaosky commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287753972


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java:
##
@@ -91,6 +91,7 @@ private Properties effectiveConfigFrom(final Properties 
initialConfig) {
 effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 100);
 effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true);
 effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 1);
+effectiveConfig.put(KafkaConfig.RackProp(), "rack0");

Review Comment:
   Good catch. Might be some left over change. Let me delete it



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -272,27 +300,50 @@ private void createMockTaskManager(final Set 
activeTasks,
 // If mockCreateInternalTopics is true the internal topic manager will 
report that it had to create all internal
 // topics and we will skip the listOffsets request for these changelogs
 private MockInternalTopicManager 
overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) {
-final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, 
null);
+}
+
+private MockInternalTopicManager 
overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, 
final List>> topicPartitionInfo) {
+final MockInternalTopicManager mockInternalTopicManager = spy(new 
MockInternalTopicManager(
 time,
 new StreamsConfig(configProps()),
 mockClientSupplier.restoreConsumer,
 mockCreateInternalTopics
-);
+));
+
+if (topicPartitionInfo != null) {
+
lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer(
+i -> {
+final Set topics = i.getArgument(0);
+for (final Map> tp : 
topicPartitionInfo) {
+if (topics.equals(tp.keySet())) {
+return tp;
+}
+}
+return null;

Review Comment:
   We can return empty Map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15224) Automate version change to snapshot

2023-08-08 Thread Tanay Karmarkar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanay Karmarkar reassigned KAFKA-15224:
---

Assignee: Tanay Karmarkar

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-08-08 Thread via GitHub


kirktrue commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1670400792

   > @kirktrue : For consistent infra issues, we could file an infra jira for 
the ASF infra team to help. For transient test failures, if we could identify 
when the test failure first occurred and correlate it with the PR that might 
have introduced it, we could file a jira and ping the PR author to take a look.
   
   @jun done: https://issues.apache.org/jira/browse/INFRA-24874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-08-08 Thread via GitHub


kirktrue commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1670369317

   @junrao I created this INFRA Jira just now to see if they can help resolve 
some of the build issues: https://issues.apache.org/jira/browse/INFRA-24874
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15312:
---
Description: 
On ext4 file systems it is possible for KRaft to create zero-length snapshot 
files. Not all file system fsync to disk on close. For KRaft to guarantee that 
the data has made it to disk before calling rename, it needs to make sure that 
the file has been fsync.

We have seen cases were the snapshot file has zero-length data on ext4 file 
system.
{quote} "Delayed allocation" means that the filesystem tries to delay the 
allocation of physical disk blocks for written data for as long as possible. 
This policy brings some important performance benefits. Many files are 
short-lived; delayed allocation can keep the system from writing fleeting 
temporary files to disk at all. And, for longer-lived files, delayed allocation 
allows the kernel to accumulate more data and to allocate the blocks for data 
contiguously, speeding up both the write and any subsequent reads of that data. 
It's an important optimization which is found in most contemporary filesystems.

But, if blocks have not been allocated for a file, there is no need to write 
them quickly as a security measure. Since the blocks do not yet exist, it is 
not possible to read somebody else's data from them. So ext4 will not (cannot) 
write out unallocated blocks as part of the next journal commit cycle. Those 
blocks will, instead, wait until the kernel decides to flush them out; at that 
point, physical blocks will be allocated on disk and the data will be made 
persistent. The kernel doesn't like to let file data sit unwritten for too 
long, but it can still take a minute or so (with the default settings) for that 
data to be flushed - far longer than the five seconds normally seen with ext3. 
And that is why a crash can cause the loss of quite a bit more data when ext4 
is being used. 
{quote}
from: [https://lwn.net/Articles/322823/]
{quote}auto_da_alloc ( * ), noauto_da_alloc

Many broken applications don't use fsync() when replacing existing files via 
patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", 
"foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If 
auto_da_alloc is enabled, ext4 will detect the replace-via-rename and 
replace-via-truncate patterns and force that any delayed allocation blocks are 
allocated such that at the next journal commit, in the default data=ordered 
mode, the data blocks of the new file are forced to disk before the rename() 
operation is committed. This provides roughly the same level of guarantees as 
ext3, and avoids the "zero-length" problem that can happen when a system 
crashes before the delayed allocation blocks are forced to disk.
{quote}
from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]

 

  was:
Not all file system fsync to disk on close. For KRaft to guarantee that the 
data has made it to disk before calling rename it needs to make sure that the 
file has been fsync.

We have seen cases were the snapshot file has zero-length data on ext4 file 
system.
{quote} "Delayed allocation" means that the filesystem tries to delay the 
allocation of physical disk blocks for written data for as long as possible. 
This policy brings some important performance benefits. Many files are 
short-lived; delayed allocation can keep the system from writing fleeting 
temporary files to disk at all. And, for longer-lived files, delayed allocation 
allows the kernel to accumulate more data and to allocate the blocks for data 
contiguously, speeding up both the write and any subsequent reads of that data. 
It's an important optimization which is found in most contemporary filesystems.

But, if blocks have not been allocated for a file, there is no need to write 
them quickly as a security measure. Since the blocks do not yet exist, it is 
not possible to read somebody else's data from them. So ext4 will not (cannot) 
write out unallocated blocks as part of the next journal commit cycle. Those 
blocks will, instead, wait until the kernel decides to flush them out; at that 
point, physical blocks will be allocated on disk and the data will be made 
persistent. The kernel doesn't like to let file data sit unwritten for too 
long, but it can still take a minute or so (with the default settings) for that 
data to be flushed - far longer than the five seconds normally seen with ext3. 
And that is why a crash can cause the loss of quite a bit more data when ext4 
is being used. 
{quote}
from: [https://lwn.net/Articles/322823/]
{quote}auto_da_alloc ( * ), noauto_da_alloc

Many broken applications don't use fsync() when replacing existing files via 
patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", 
"foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If 
auto_da_alloc is enabled,

[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15312:
---
Fix Version/s: 3.5.2

> FileRawSnapshotWriter must flush before atomic move
> ---
>
> Key: KAFKA-15312
> URL: https://issues.apache.org/jira/browse/KAFKA-15312
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.5.2
>
>
> Not all file system fsync to disk on close. For KRaft to guarantee that the 
> data has made it to disk before calling rename it needs to make sure that the 
> file has been fsync.
> We have seen cases were the snapshot file has zero-length data on ext4 file 
> system.
> {quote} "Delayed allocation" means that the filesystem tries to delay the 
> allocation of physical disk blocks for written data for as long as possible. 
> This policy brings some important performance benefits. Many files are 
> short-lived; delayed allocation can keep the system from writing fleeting 
> temporary files to disk at all. And, for longer-lived files, delayed 
> allocation allows the kernel to accumulate more data and to allocate the 
> blocks for data contiguously, speeding up both the write and any subsequent 
> reads of that data. It's an important optimization which is found in most 
> contemporary filesystems.
> But, if blocks have not been allocated for a file, there is no need to write 
> them quickly as a security measure. Since the blocks do not yet exist, it is 
> not possible to read somebody else's data from them. So ext4 will not 
> (cannot) write out unallocated blocks as part of the next journal commit 
> cycle. Those blocks will, instead, wait until the kernel decides to flush 
> them out; at that point, physical blocks will be allocated on disk and the 
> data will be made persistent. The kernel doesn't like to let file data sit 
> unwritten for too long, but it can still take a minute or so (with the 
> default settings) for that data to be flushed - far longer than the five 
> seconds normally seen with ext3. And that is why a crash can cause the loss 
> of quite a bit more data when ext4 is being used. 
> {quote}
> from: [https://lwn.net/Articles/322823/]
> {quote}auto_da_alloc ( * ), noauto_da_alloc
> Many broken applications don't use fsync() when replacing existing files via 
> patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ 
> rename("foo.new", "foo"), or worse yet, fd = open("foo", 
> O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will 
> detect the replace-via-rename and replace-via-truncate patterns and force 
> that any delayed allocation blocks are allocated such that at the next 
> journal commit, in the default data=ordered mode, the data blocks of the new 
> file are forced to disk before the rename() operation is committed. This 
> provides roughly the same level of guarantees as ext3, and avoids the 
> "zero-length" problem that can happen when a system crashes before the 
> delayed allocation blocks are forced to disk.
> {quote}
> from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio merged pull request #14162: KAFKA-15312; Force channel before atomic file move

2023-08-08 Thread via GitHub


jsancio merged PR #14162:
URL: https://github.com/apache/kafka/pull/14162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287701495


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -227,15 +350,38 @@ private void formatClientStates(final boolean 
printUnassigned) {
 }
 }
 
+@Parameter
+public boolean enableRackAwareTaskAssignor;
+
+private String rackAwareStrategy = 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
+
+@Before
+public void setUp() {
+if (enableRackAwareTaskAssignor) {
+rackAwareStrategy = 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC;
+}
+}
+
+@Parameterized.Parameters(name = "enableRackAwareTaskAssignor={0}")
+public static Collection getParamStoreType() {
+return asList(new Object[][] {
+{true},
+{false}
+});
+}
+
 @Test
 public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
 final AssignmentConfigs configs = new AssignmentConfigs(100L,
 2,
 0,
 60_000L,
-
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+null,
+null,

Review Comment:
   fix indention



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -248,21 +394,29 @@ public void assignmentShouldConvergeAfterAddingNode() {
 final int numStatefulTasks = 11;
 final int maxWarmupReplicas = 2;
 final int numStandbyReplicas = 0;
+final int numNodes = 10;
 
 final AssignmentConfigs configs = new AssignmentConfigs(100L,
 
maxWarmupReplicas,
 
numStandbyReplicas,
 60_000L,
-
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+null,
+null,

Review Comment:
   fix indention



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #14168: MINOR; Fix nanosecond elapsed time

2023-08-08 Thread via GitHub


jsancio commented on code in PR #14168:
URL: https://github.com/apache/kafka/pull/14168#discussion_r1287699245


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -445,28 +444,24 @@ public void testBalancePartitionLeaders() throws 
Throwable {
 active.alterPartition(ANONYMOUS_CONTEXT, new AlterPartitionRequest
 .Builder(alterPartitionRequest, false).build((short) 
0).data()).get();
 
-AtomicLong lastHeartbeatMs = new 
AtomicLong(getMonotonicMs(active.time()));
+AtomicLong lastHeartbeatNs = new 
AtomicLong(active.time().nanoseconds());
 sendBrokerHeartbeat(active, allBrokers, brokerEpochs);
 // Check that partitions are balanced
 TestUtils.waitForCondition(
 () -> {
-long currentMonotonicMs = getMonotonicMs(active.time());
-if (currentMonotonicMs > lastHeartbeatMs.get() + 
(sessionTimeoutMillis / 2)) {
-lastHeartbeatMs.set(currentMonotonicMs);

Review Comment:
   If `active.time().nanoseconds()` overflows (wraps), `currentMonotonicMs > 
lastHeartbeatMs.get() + (sessionTimeoutMillis / 2)` will be false until roughly 
292 years (2^63 nanoseconds). The Java Doc recommends to not comparing 
`nanoTime()` and to instead compare elapsed time `nanoTime() - nanoTime()`. See 
the Java Doc for details: 
https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287697676


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -45,75 +81,162 @@ private static final class Harness {
 private final Map droppedClientStates;
 private final StringBuilder history = new StringBuilder();
 
+public final Map> partitionsForTask;
+public final Map> 
changelogPartitionsForTask;
+public final Map> tasksForTopicGroup;
+public final Cluster fullMetadata;
+public final Map>> 
racksForProcessConsumer;
+public final InternalTopicManager internalTopicManager;
+
 private static Harness initializeCluster(final int numStatelessTasks,
  final int numStatefulTasks,
- final int numNodes,
- final Supplier 
partitionCountSupplier) {
+ final int numClients,
+ final Supplier 
partitionCountSupplier,
+ final int numNodes) {
 int subtopology = 0;
 final Set statelessTasks = new TreeSet<>();
 int remainingStatelessTasks = numStatelessTasks;
+final List nodes = getRandomNodes(numNodes);
+int nodeIndex = 0;
+final Set partitionInfoSet = new HashSet<>();
+final Map> partitionsForTask = new 
HashMap<>();
+final Map> changelogPartitionsForTask 
= new HashMap<>();
+final Map> tasksForTopicGroup = new 
HashMap<>();
+
 while (remainingStatelessTasks > 0) {
 final int partitions = Math.min(remainingStatelessTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statelessTasks.add(new TaskId(subtopology, i));
+final TaskId taskId = new TaskId(subtopology, i);
+statelessTasks.add(taskId);
 remainingStatelessTasks--;
+
+final Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
 }
 subtopology++;
 }
 
 final Map statefulTaskEndOffsetSums = new 
TreeMap<>();
+final Map> topicPartitionInfo = 
new HashMap<>();
+final Set changelogNames = new HashSet<>();
 int remainingStatefulTasks = numStatefulTasks;
 while (remainingStatefulTasks > 0) {
+final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" 
+ subtopology;
+changelogNames.add(changelogTopicName);
 final int partitions = Math.min(remainingStatefulTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 
15L);
+final TaskId taskId = new TaskId(subtopology, i);
+statefulTaskEndOffsetSums.put(taskId, 15L);
 remainingStatefulTasks--;
+
+Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+changelogPartitionsForTask.put(taskId, mkSet(new 
TopicPartition(changelogTopicName, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
+
+final Random random = new Random();
+final int changelogNodeIndex = 
random.nextInt(nodes.size());
+replica = getRandomReplica(nodes, changelogNodeIndex);
+final TopicPartitionInfo info = new TopicPartitionInfo(i, 
replica[0], Arrays.asList(replica[0], replica[1]), Collections.emptyList());
+topicPartitionInfo.computeIfAbsent(changelogTopicName, tp 
-> new ArrayList<>()).add(info);
 }
 subtopology++;
 }
 
+final MockTime time = new MockTime();
+final StreamsConfig streamsConfig = new 
StreamsConfig(configProps(true));
+f

[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287697676


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -45,75 +81,162 @@ private static final class Harness {
 private final Map droppedClientStates;
 private final StringBuilder history = new StringBuilder();
 
+public final Map> partitionsForTask;
+public final Map> 
changelogPartitionsForTask;
+public final Map> tasksForTopicGroup;
+public final Cluster fullMetadata;
+public final Map>> 
racksForProcessConsumer;
+public final InternalTopicManager internalTopicManager;
+
 private static Harness initializeCluster(final int numStatelessTasks,
  final int numStatefulTasks,
- final int numNodes,
- final Supplier 
partitionCountSupplier) {
+ final int numClients,
+ final Supplier 
partitionCountSupplier,
+ final int numNodes) {
 int subtopology = 0;
 final Set statelessTasks = new TreeSet<>();
 int remainingStatelessTasks = numStatelessTasks;
+final List nodes = getRandomNodes(numNodes);
+int nodeIndex = 0;
+final Set partitionInfoSet = new HashSet<>();
+final Map> partitionsForTask = new 
HashMap<>();
+final Map> changelogPartitionsForTask 
= new HashMap<>();
+final Map> tasksForTopicGroup = new 
HashMap<>();
+
 while (remainingStatelessTasks > 0) {
 final int partitions = Math.min(remainingStatelessTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statelessTasks.add(new TaskId(subtopology, i));
+final TaskId taskId = new TaskId(subtopology, i);
+statelessTasks.add(taskId);
 remainingStatelessTasks--;
+
+final Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
 }
 subtopology++;
 }
 
 final Map statefulTaskEndOffsetSums = new 
TreeMap<>();
+final Map> topicPartitionInfo = 
new HashMap<>();
+final Set changelogNames = new HashSet<>();
 int remainingStatefulTasks = numStatefulTasks;
 while (remainingStatefulTasks > 0) {
+final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" 
+ subtopology;
+changelogNames.add(changelogTopicName);
 final int partitions = Math.min(remainingStatefulTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 
15L);
+final TaskId taskId = new TaskId(subtopology, i);
+statefulTaskEndOffsetSums.put(taskId, 15L);
 remainingStatefulTasks--;
+
+Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+changelogPartitionsForTask.put(taskId, mkSet(new 
TopicPartition(changelogTopicName, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
+
+final Random random = new Random();
+final int changelogNodeIndex = 
random.nextInt(nodes.size());
+replica = getRandomReplica(nodes, changelogNodeIndex);
+final TopicPartitionInfo info = new TopicPartitionInfo(i, 
replica[0], Arrays.asList(replica[0], replica[1]), Collections.emptyList());
+topicPartitionInfo.computeIfAbsent(changelogTopicName, tp 
-> new ArrayList<>()).add(info);
 }
 subtopology++;
 }
 
+final MockTime time = new MockTime();
+final StreamsConfig streamsConfig = new 
StreamsConfig(configProps(true));
+f

[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287695148


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -45,75 +81,162 @@ private static final class Harness {
 private final Map droppedClientStates;
 private final StringBuilder history = new StringBuilder();
 
+public final Map> partitionsForTask;
+public final Map> 
changelogPartitionsForTask;
+public final Map> tasksForTopicGroup;
+public final Cluster fullMetadata;
+public final Map>> 
racksForProcessConsumer;
+public final InternalTopicManager internalTopicManager;
+
 private static Harness initializeCluster(final int numStatelessTasks,
  final int numStatefulTasks,
- final int numNodes,
- final Supplier 
partitionCountSupplier) {
+ final int numClients,
+ final Supplier 
partitionCountSupplier,
+ final int numNodes) {
 int subtopology = 0;
 final Set statelessTasks = new TreeSet<>();
 int remainingStatelessTasks = numStatelessTasks;
+final List nodes = getRandomNodes(numNodes);
+int nodeIndex = 0;
+final Set partitionInfoSet = new HashSet<>();
+final Map> partitionsForTask = new 
HashMap<>();
+final Map> changelogPartitionsForTask 
= new HashMap<>();
+final Map> tasksForTopicGroup = new 
HashMap<>();
+
 while (remainingStatelessTasks > 0) {
 final int partitions = Math.min(remainingStatelessTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statelessTasks.add(new TaskId(subtopology, i));
+final TaskId taskId = new TaskId(subtopology, i);
+statelessTasks.add(taskId);
 remainingStatelessTasks--;
+
+final Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
 }
 subtopology++;
 }
 
 final Map statefulTaskEndOffsetSums = new 
TreeMap<>();
+final Map> topicPartitionInfo = 
new HashMap<>();
+final Set changelogNames = new HashSet<>();
 int remainingStatefulTasks = numStatefulTasks;
 while (remainingStatefulTasks > 0) {
+final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" 
+ subtopology;
+changelogNames.add(changelogTopicName);
 final int partitions = Math.min(remainingStatefulTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 
15L);
+final TaskId taskId = new TaskId(subtopology, i);
+statefulTaskEndOffsetSums.put(taskId, 15L);
 remainingStatefulTasks--;
+
+Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+changelogPartitionsForTask.put(taskId, mkSet(new 
TopicPartition(changelogTopicName, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
+
+final Random random = new Random();

Review Comment:
   We should init `Random` with a value that we log (to allow us to reproduce 
the same test)
   ```
   final long seed = System.currentTimeMillis():
   log.info(seed); // or just use println
   final Random random = new Random(seed);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287695148


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -45,75 +81,162 @@ private static final class Harness {
 private final Map droppedClientStates;
 private final StringBuilder history = new StringBuilder();
 
+public final Map> partitionsForTask;
+public final Map> 
changelogPartitionsForTask;
+public final Map> tasksForTopicGroup;
+public final Cluster fullMetadata;
+public final Map>> 
racksForProcessConsumer;
+public final InternalTopicManager internalTopicManager;
+
 private static Harness initializeCluster(final int numStatelessTasks,
  final int numStatefulTasks,
- final int numNodes,
- final Supplier 
partitionCountSupplier) {
+ final int numClients,
+ final Supplier 
partitionCountSupplier,
+ final int numNodes) {
 int subtopology = 0;
 final Set statelessTasks = new TreeSet<>();
 int remainingStatelessTasks = numStatelessTasks;
+final List nodes = getRandomNodes(numNodes);
+int nodeIndex = 0;
+final Set partitionInfoSet = new HashSet<>();
+final Map> partitionsForTask = new 
HashMap<>();
+final Map> changelogPartitionsForTask 
= new HashMap<>();
+final Map> tasksForTopicGroup = new 
HashMap<>();
+
 while (remainingStatelessTasks > 0) {
 final int partitions = Math.min(remainingStatelessTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statelessTasks.add(new TaskId(subtopology, i));
+final TaskId taskId = new TaskId(subtopology, i);
+statelessTasks.add(taskId);
 remainingStatelessTasks--;
+
+final Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
 }
 subtopology++;
 }
 
 final Map statefulTaskEndOffsetSums = new 
TreeMap<>();
+final Map> topicPartitionInfo = 
new HashMap<>();
+final Set changelogNames = new HashSet<>();
 int remainingStatefulTasks = numStatefulTasks;
 while (remainingStatefulTasks > 0) {
+final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" 
+ subtopology;
+changelogNames.add(changelogTopicName);
 final int partitions = Math.min(remainingStatefulTasks, 
partitionCountSupplier.get());
 for (int i = 0; i < partitions; i++) {
-statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 
15L);
+final TaskId taskId = new TaskId(subtopology, i);
+statefulTaskEndOffsetSums.put(taskId, 15L);
 remainingStatefulTasks--;
+
+Node[] replica = getRandomReplica(nodes, nodeIndex);
+partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+nodeIndex++;
+
+partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+changelogPartitionsForTask.put(taskId, mkSet(new 
TopicPartition(changelogTopicName, i)));
+tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
+
+final Random random = new Random();

Review Comment:
   We should init random with a value that we log (to allow us to reproduce the 
same test)
   ```
   final long seed = System.currentTimeMillis():
   log.info(seed); // or just use println
   final Random random = new Random(seed);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287692186


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -587,14 +587,18 @@ static List getRandomNodes(final int nodeSize) {
 return nodeList;
 }
 
+static Node[] getRandomReplica(final List nodeList, final int index) 
{

Review Comment:
   Does not seem to be random actually?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -587,14 +587,18 @@ static List getRandomNodes(final int nodeSize) {
 return nodeList;
 }
 
+static Node[] getRandomReplica(final List nodeList, final int index) 
{

Review Comment:
   Does not seem to be random actually?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287685324


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -1768,16 +1993,22 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
 
 subscriptions.put(CONSUMER_1,

Review Comment:
   nit: move `CONSUMER_1` to next line, too (same below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287685324


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -1768,16 +1993,22 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
 
 subscriptions.put(CONSUMER_1,

Review Comment:
   nit: move `CONSUMER_1` to next line, too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287683870


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -1241,12 +1410,18 @@ public void 
testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
 final List topics = asList("topic1", APPLICATION_ID + 
"-topicX", APPLICATION_ID + "-topicZ");
 final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
 
-final MockInternalTopicManager internalTopicManager = 
configureDefault();
+createDefaultMockTaskManager();
+final List>> 
changelogTopicPartitionInfo = getTopicPartitionInfo(4,

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287683621


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -1213,12 +1376,18 @@ public void testAssignWithInternalTopics() {
 final List topics = asList("topic1", APPLICATION_ID + 
"-topicX");
 final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
 
-final MockInternalTopicManager internalTopicManager = 
configureDefault();
+createDefaultMockTaskManager();
+final List>> 
changelogTopicPartitionInfo = getTopicPartitionInfo(4,

Review Comment:
   nit: move first parameter to next line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287682340


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -809,14 +888,37 @@ public void testAssignWithStates() {
APPLICATION_ID + "-store3-changelog"),
 asList(3, 3, 3))
 );
-configureDefault();
+
+createDefaultMockTaskManager();
+final List>> 
changelogTopicPartitionInfo = getTopicPartitionInfo(3,

Review Comment:
   nit: move first parameter `3,` into it's own line below (hard to read the 
code otherwise)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287681335


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -272,27 +300,50 @@ private void createMockTaskManager(final Set 
activeTasks,
 // If mockCreateInternalTopics is true the internal topic manager will 
report that it had to create all internal
 // topics and we will skip the listOffsets request for these changelogs
 private MockInternalTopicManager 
overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) {
-final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, 
null);
+}
+
+private MockInternalTopicManager 
overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, 
final List>> topicPartitionInfo) {
+final MockInternalTopicManager mockInternalTopicManager = spy(new 
MockInternalTopicManager(
 time,
 new StreamsConfig(configProps()),
 mockClientSupplier.restoreConsumer,
 mockCreateInternalTopics
-);
+));
+
+if (topicPartitionInfo != null) {
+
lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer(
+i -> {
+final Set topics = i.getArgument(0);
+for (final Map> tp : 
topicPartitionInfo) {
+if (topics.equals(tp.keySet())) {
+return tp;
+}
+}
+return null;

Review Comment:
   Is `null` right here? Or should it be an empty `Map` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287680080


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java:
##
@@ -272,27 +300,50 @@ private void createMockTaskManager(final Set 
activeTasks,
 // If mockCreateInternalTopics is true the internal topic manager will 
report that it had to create all internal
 // topics and we will skip the listOffsets request for these changelogs
 private MockInternalTopicManager 
overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) {
-final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, 
null);
+}
+
+private MockInternalTopicManager 
overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, 
final List>> topicPartitionInfo) {
+final MockInternalTopicManager mockInternalTopicManager = spy(new 
MockInternalTopicManager(
 time,
 new StreamsConfig(configProps()),
 mockClientSupplier.restoreConsumer,
 mockCreateInternalTopics
-);
+));
+
+if (topicPartitionInfo != null) {
+
lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer(
+i -> {

Review Comment:
   So `i` is the input parameter passed into `getTopicPartitionInfo` ? -- Can 
we find a better name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287676811


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java:
##
@@ -91,6 +91,7 @@ private Properties effectiveConfigFrom(final Properties 
initialConfig) {
 effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 100);
 effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true);
 effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 1);
+effectiveConfig.put(KafkaConfig.RackProp(), "rack0");

Review Comment:
   why do we need this overwrite?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-08 Thread via GitHub


mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287675546


##
streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java:
##
@@ -300,7 +329,9 @@ private static Properties streamsProperties(final String 
appId,
 // Increasing the number of threads to ensure that a rebalance 
happens each time a consumer sends a rejoin (KAFKA-10455)
 mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40),
 mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName()),
-mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName())
+mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName()),
+mkEntry(CommonClientConfigs.CLIENT_RACK_CONFIG, 
AssignmentTestUtils.RACK_0),

Review Comment:
   Why is this hard-coded? Should we use different racks for different KS 
instances?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15318) Update the Authorizer using AclPublisher

2023-08-08 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-15318:


Assignee: Colin McCabe

> Update the Authorizer using AclPublisher
> 
>
> Key: KAFKA-15318
> URL: https://issues.apache.org/jira/browse/KAFKA-15318
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> On the controller, move publishing acls to the authorizer into a dedicated 
> MetadataPublisher, AclPublisher. This publisher listens for notifications 
> from MetadataLoader, and receives only committed data. This brings the 
> controller side in line with how the broker has always worked. It also avoids 
> some ugly code related to publishing directly from the QuorumController. Most 
> important of all, it clears the way to implement metadata transactions 
> without worrying about Authorizer state (since it will be handled by the 
> MetadataLoader, along with other metadata image state).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe opened a new pull request, #14169: KAFKA-15318: Update the Authorizer using AclPublisher

2023-08-08 Thread via GitHub


cmccabe opened a new pull request, #14169:
URL: https://github.com/apache/kafka/pull/14169

   On the controller, move publishing acls to the Authorizer into a dedicated 
MetadataPublisher, AclPublisher. This publisher listens for notifications from 
MetadataLoader, and receives only committed data.  This brings the controller 
side in line with how the broker has always worked. It also avoids some ugly 
code related to publishing directly from the QuorumController. Most important 
of all, it clears the way to implement metadata transactions without worrying 
about Authorizer state (since it will be handled by the MetadataLoader, along 
with other metadata image state).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15318) Update the Authorizer using AclPublisher

2023-08-08 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15318:
-
Description: On the controller, move publishing acls to the authorizer into 
a dedicated MetadataPublisher, AclPublisher. This publisher listens for 
notifications from MetadataLoader, and receives only committed data. This 
brings the controller side in line with how the broker has always worked. It 
also avoids some ugly code related to publishing directly from the 
QuorumController. Most important of all, it clears the way to implement 
metadata transactions without worrying about Authorizer state (since it will be 
handled by the MetadataLoader, along with other metadata image state).  (was: 
On the controller, move Acl publishing into a dedicated MetadataPublisher, 
AclPublisher. This publisher listens for notifications from MetadataLoader, and 
receives only committed data. This brings the controller side in line with how 
the broker has always worked. It also avoids some ugly code related to 
publishing directly from the QuorumController. Most important of all, it clears 
the way to implement metadata transactions without worrying about Authorizer 
state (since it will be handled by the MetadataLoader, along with other 
metadata image state).)

> Update the Authorizer using AclPublisher
> 
>
> Key: KAFKA-15318
> URL: https://issues.apache.org/jira/browse/KAFKA-15318
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>
> On the controller, move publishing acls to the authorizer into a dedicated 
> MetadataPublisher, AclPublisher. This publisher listens for notifications 
> from MetadataLoader, and receives only committed data. This brings the 
> controller side in line with how the broker has always worked. It also avoids 
> some ugly code related to publishing directly from the QuorumController. Most 
> important of all, it clears the way to implement metadata transactions 
> without worrying about Authorizer state (since it will be handled by the 
> MetadataLoader, along with other metadata image state).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15318) Update the Authorizer using AclPublisher

2023-08-08 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15318:
-
Summary: Update the Authorizer using AclPublisher  (was: Move Acl 
publishing outside the QuorumController)

> Update the Authorizer using AclPublisher
> 
>
> Key: KAFKA-15318
> URL: https://issues.apache.org/jira/browse/KAFKA-15318
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>
> On the controller, move Acl publishing into a dedicated MetadataPublisher, 
> AclPublisher. This publisher listens for notifications from MetadataLoader, 
> and receives only committed data. This brings the controller side in line 
> with how the broker has always worked. It also avoids some ugly code related 
> to publishing directly from the QuorumController. Most important of all, it 
> clears the way to implement metadata transactions without worrying about 
> Authorizer state (since it will be handled by the MetadataLoader, along with 
> other metadata image state).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15318) Move Acl publishing outside the QuorumController

2023-08-08 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15318:


 Summary: Move Acl publishing outside the QuorumController
 Key: KAFKA-15318
 URL: https://issues.apache.org/jira/browse/KAFKA-15318
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe


On the controller, move Acl publishing into a dedicated MetadataPublisher, 
AclPublisher. This publisher listens for notifications from MetadataLoader, and 
receives only committed data. This brings the controller side in line with how 
the broker has always worked. It also avoids some ugly code related to 
publishing directly from the QuorumController. Most important of all, it clears 
the way to implement metadata transactions without worrying about Authorizer 
state (since it will be handled by the MetadataLoader, along with other 
metadata image state).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15317) Fix for async consumer access to committed offsets with multiple consumers

2023-08-08 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15317:
--

 Summary: Fix for async consumer access to committed offsets with 
multiple consumers
 Key: KAFKA-15317
 URL: https://issues.apache.org/jira/browse/KAFKA-15317
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


Access to the committed offsets via a call to the _committed_ API func works as 
expected for a single async consumer, but it some times fails with timeout when 
trying to retrieve the committed offsets with another consumer in the same 
group. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15126) Change range queries to accept null lower and upper bounds

2023-08-08 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-15126.
-
Resolution: Fixed

[Merged to trunk|https://github.com/apache/kafka/pull/14137]

> Change range queries to accept null lower and upper bounds
> --
>
> Key: KAFKA-15126
> URL: https://issues.apache.org/jira/browse/KAFKA-15126
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Minor
> Fix For: 3.6.0
>
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> {color:#1d1c1d}When web client requests come in with query params, it's 
> common for those params to be null. We want developers to just be able to 
> pass in the upper/lower bounds if they want instead of implementing their own 
> logic to avoid getting the whole range (which will happen if they leave the 
> params null). {color}
> {color:#1d1c1d}An example of the logic they can avoid using after this KIP is 
> implemented is below:{color}
> {code:java}
> private RangeQuery> 
> createRangeQuery(String lower, String upper) {
> if (isBlank(lower) && isBlank(upper)) {
> return RangeQuery.withNoBounds();
> } else if (!isBlank(lower) && isBlank(upper)) {
> return RangeQuery.withLowerBound(lower);
> } else if (isBlank(lower) && !isBlank(upper)) {
> return RangeQuery.withUpperBound(upper);
> } else {
> return RangeQuery.withRange(lower, upper);
> }
> } {code}
>  
> | |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15126) Change range queries to accept null lower and upper bounds

2023-08-08 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-15126:

Fix Version/s: 3.6.0

> Change range queries to accept null lower and upper bounds
> --
>
> Key: KAFKA-15126
> URL: https://issues.apache.org/jira/browse/KAFKA-15126
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Minor
> Fix For: 3.6.0
>
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> {color:#1d1c1d}When web client requests come in with query params, it's 
> common for those params to be null. We want developers to just be able to 
> pass in the upper/lower bounds if they want instead of implementing their own 
> logic to avoid getting the whole range (which will happen if they leave the 
> params null). {color}
> {color:#1d1c1d}An example of the logic they can avoid using after this KIP is 
> implemented is below:{color}
> {code:java}
> private RangeQuery> 
> createRangeQuery(String lower, String upper) {
> if (isBlank(lower) && isBlank(upper)) {
> return RangeQuery.withNoBounds();
> } else if (!isBlank(lower) && isBlank(upper)) {
> return RangeQuery.withLowerBound(lower);
> } else if (isBlank(lower) && !isBlank(upper)) {
> return RangeQuery.withUpperBound(upper);
> } else {
> return RangeQuery.withRange(lower, upper);
> }
> } {code}
>  
> | |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks

2023-08-08 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15316:
--

 Summary: CommitRequestManager not calling RequestState callbacks 
 Key: KAFKA-15316
 URL: https://issues.apache.org/jira/browse/KAFKA-15316
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Lianet Magrans


CommitRequestManager is not triggering the RequestState callbacks that update 
{_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the 
RequestState



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order

2023-08-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752150#comment-17752150
 ] 

Matthias J. Sax commented on KAFKA-15297:
-

Seems we are on the same page :) 

> Cache flush order might not be topological order 
> -
>
> Key: KAFKA-15297
> URL: https://issues.apache.org/jira/browse/KAFKA-15297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Bruno Cadonna
>Priority: Major
> Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .process(
> () -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> context.forward(record);
> }
> @Override
> public void close() {}
> },
> Named.as("processor1")
> )
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .toStream()
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> final Topology topology = streamsBuilder.build(streamsConfiguration);
> topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.
> You can observe the flush order by feeding some records into the input topics 
> of the topology, waiting for a commit,  and looking for the following log 
> message:
> https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513
>  
> I changed the log message from trace to debug to avoid too much noise. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

2023-08-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752149#comment-17752149
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

The only thing I am wondering is, if we should try to fix it right away in KS 
by adding an internal producer config for now until K15309 get done? – In the 
end it seems that a KS app could get stuck without the ability to skip over a 
"poison pill write", so it might be worth fixing right away (even if the fix is 
more of an hack for now)?

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using exactly_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with exactly_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of exactly_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
> due to org.apache.kafka

[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-08 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1287441674


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -613,7 +623,7 @@ private long batchReady(long nowMs, boolean exhausted, 
TopicPartition part, Node
 long waitedTimeMs, boolean backingOff, boolean 
full,
 long nextReadyCheckDelayMs, Set readyNodes) {
 if (!readyNodes.contains(leader) && !isMuted(part)) {
-long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+long timeToWaitMs = backingOff ? retryBackoff.backoff(0) : 
lingerMs;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-08 Thread via GitHub


satishd commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1287011673


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 /**
  * Deletes the internal topic partition info if delete flag is set as true.
  *
- * @param topicPartition topic partition to be stopped.
+ * @param topicPartitions topic partitions that needs to be stopped.
  * @param delete flag to indicate whether the given topic 
partitions to be deleted or not.
  */
-public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+public void stopPartitions(Set topicPartitions,
+   boolean delete,
+   BiConsumer 
errorHandler) {
+LOGGER.debug("Stopping {} partitions, delete: {}", 
topicPartitions.size(), delete);
+Set topicIdPartitions = topicPartitions.stream()
+.filter(topicIdByPartitionMap::containsKey)
+.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), 
tp))
+.collect(Collectors.toSet());
+
+topicIdPartitions.forEach(tpId -> {
+try {
+RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+if (task != null) {
+LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+task.cancel();
+}
+if (delete) {
+LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
+deleteRemoteLogPartition(tpId);

Review Comment:
   
[KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-topic-deletionTopicdeletionlifecycle)
 already talks about the approach of deleting remote log segments in 
asynchronous manner using controller and RLMM. This is not a blocking change 
for 3.6.0, filed [JIRA](https://issues.apache.org/jira/browse/KAFKA-15313) for 
the same. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-08 Thread via GitHub


satishd commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1287425728


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 /**
  * Deletes the internal topic partition info if delete flag is set as true.
  *
- * @param topicPartition topic partition to be stopped.
+ * @param topicPartitions topic partitions that needs to be stopped.
  * @param delete flag to indicate whether the given topic 
partitions to be deleted or not.
  */
-public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+public void stopPartitions(Set topicPartitions,
+   boolean delete,
+   BiConsumer 
errorHandler) {
+LOGGER.debug("Stopping {} partitions, delete: {}", 
topicPartitions.size(), delete);
+Set topicIdPartitions = topicPartitions.stream()
+.filter(topicIdByPartitionMap::containsKey)
+.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), 
tp))
+.collect(Collectors.toSet());
+
+topicIdPartitions.forEach(tpId -> {
+try {
+RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+if (task != null) {
+LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+task.cancel();
+}
+if (delete) {
+LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
+deleteRemoteLogPartition(tpId);

Review Comment:
   
[KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-topic-deletionTopicdeletionlifecycle)
 already talks about the approach of deleting remote log segments in 
asynchronous manner using controller and RLMM. This is not a blocking change 
for 3.6.0, filed [JIRA](https://issues.apache.org/jira/browse/KAFKA-15313) for 
the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)

2023-08-08 Thread via GitHub


gharris1727 merged PR #14055:
URL: https://github.com/apache/kafka/pull/14055


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)

2023-08-08 Thread via GitHub


gharris1727 commented on PR #14055:
URL: https://github.com/apache/kafka/pull/14055#issuecomment-1669994084

   Flaky test failures appear unrelated, and the tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #14168: MINOR; Fix nanosecond elapsed time

2023-08-08 Thread via GitHub


jsancio opened a new pull request, #14168:
URL: https://github.com/apache/kafka/pull/14168

   Time.nanoseconds() (System.nanoTime()) is not monotonically increasing and 
it is allowed for the value to overflow (wrap). It can only be used to measure 
elapsed time. Elapsed time can be computed by subtracting two nanoseconds() 
calls.
   
   I ran `testBalancePartitionLeaders` 1000 times using jqwik without any 
failures.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-08 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1286480033


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -613,7 +623,7 @@ private long batchReady(long nowMs, boolean exhausted, 
TopicPartition part, Node
 long waitedTimeMs, boolean backingOff, boolean 
full,
 long nextReadyCheckDelayMs, Set readyNodes) {
 if (!readyNodes.contains(leader) && !isMuted(part)) {
-long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+long timeToWaitMs = backingOff ? retryBackoff.backoff(0) : 
lingerMs;

Review Comment:
   Hmm, we should use the number of retries for the batch instead of 0 for 
calculating timeToWaitMs, right?
   
   Also, this is an existing issue. It seems that nowMs is never used.



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -144,13 +159,25 @@ public long metadataExpireMs() {
  */
 public synchronized int requestUpdate() {
 this.needFullUpdate = true;
+this.backoffUpdateRequests = 0L;

Review Comment:
   Hmm, this probably needs some more thought. For example, if a produce 
request fails, we request a metadata update and set backoffUpdateRequests to 0. 
However, if we exponentially back off the produce request because of stale 
metadata, it seems that we should do the same for refreshing the metadata. 
Otherwise, we likely will still have the stale metadata when retrying the 
produce request.
   
   Ditto for fetch requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)

2023-08-08 Thread via GitHub


lucasbru commented on code in PR #14001:
URL: https://github.com/apache/kafka/pull/14001#discussion_r1287335415


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##
@@ -86,12 +87,29 @@ private void runOnce(final long nowMs) {
 
 if (currentTask == null) {
 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
-} else {
-// if a task is no longer processable, ask task-manager to 
give it another
-// task in the next iteration
-if (currentTask.isProcessable(nowMs)) {
+}
+
+if (currentTask != null) {

Review Comment:
   Yes, I think this is good for a separate PR. Also not really related to 
punctuation anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)

2023-08-08 Thread via GitHub


cadonna commented on code in PR #14001:
URL: https://github.com/apache/kafka/pull/14001#discussion_r1287332777


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##
@@ -86,12 +87,29 @@ private void runOnce(final long nowMs) {
 
 if (currentTask == null) {
 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
-} else {
-// if a task is no longer processable, ask task-manager to 
give it another
-// task in the next iteration
-if (currentTask.isProcessable(nowMs)) {
+}
+
+if (currentTask != null) {

Review Comment:
   Let's discuss that and put it into a separate PR. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-08 Thread via GitHub


fvaleri commented on code in PR #14092:
URL: https://github.com/apache/kafka/pull/14092#discussion_r1287331776


##
tests/kafkatest/services/performance/producer_performance.py:
##
@@ -24,7 +24,6 @@
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH
 
-

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-08 Thread via GitHub


kamalcph commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1287319821


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   For my understanding, will this approach work if the user wants to downgrade 
from 3.6 to 2.8?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-08 Thread via GitHub


kamalcph commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1287319821


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   For my understanding, will this approach will work if the user wants to 
downgrade from 3.6 to 2.8?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-08 Thread via GitHub


gharris1727 commented on code in PR #14092:
URL: https://github.com/apache/kafka/pull/14092#discussion_r1287310500


##
tests/kafkatest/services/performance/producer_performance.py:
##
@@ -24,7 +24,6 @@
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH
 
-

Review Comment:
   Could you revert this whitespace change too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order

2023-08-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752108#comment-17752108
 ] 

Guozhang Wang commented on KAFKA-15297:
---

{{We are not sure if we should not instead decouple caching from forwarding.}} 
I'd assume double negations here mean "We think we should just try to decouple 
caching from forwarding as the right solution" :) And yes, I'd love to see that 
happening as I've advocated for it for many years, and I was thinking about 
just "suppressing" the records in the last sink processor of the sub-topology 
to achieve the same effect of less send over the network. It may be just 
similar to what you meant by "caching on the last state store" or may be having 
some corner differences. In either way like you said it will lose some benefit 
of processing less records at the later stage of a sub-topology, but I think in 
most cases given a sub-topology's size this seems a good trade for simplicity.

It also have many other benefits, just to name a few: 1) we have much simpler 
timestamp tracking (today it's as finer-grained as per-processor) with a task 
as every record will always go through the whole sub-topology, 2) we have 
simpler version tracking within sub-topologies for IQ since now all state 
stores have the same version.

> Cache flush order might not be topological order 
> -
>
> Key: KAFKA-15297
> URL: https://issues.apache.org/jira/browse/KAFKA-15297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Bruno Cadonna
>Priority: Major
> Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .process(
> () -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> context.forward(record);
> }
> @Override
> public void close() {}
> },
> Named.as("processor1")
> )
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .toStream()
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> final Topology topology = streamsBuilder.build(streamsConfiguration);
> topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache 

[GitHub] [kafka] bachmanity1 commented on pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-08 Thread via GitHub


bachmanity1 commented on PR #14153:
URL: https://github.com/apache/kafka/pull/14153#issuecomment-1669806857

   @yashmayya I've applied your suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor

2023-08-08 Thread via GitHub


mjsax merged PR #14139:
URL: https://github.com/apache/kafka/pull/14139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #14163: MINOR: Improve JavaDocs of KafkaStreams `context.commit()`

2023-08-08 Thread via GitHub


mjsax merged PR #14163:
URL: https://github.com/apache/kafka/pull/14163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 commented on a diff in pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-08 Thread via GitHub


bachmanity1 commented on code in PR #14153:
URL: https://github.com/apache/kafka/pull/14153#discussion_r1287236551


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -114,29 +113,49 @@ public class KafkaBasedLogTest {
 private static final String TP0_VALUE_NEW = "VAL0_NEW";
 private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-private Time time = new MockTime();
-private KafkaBasedLog store;
+private final Time time = new MockTime();
+private MockedKafkaBasedLog store;
 
 @Mock
-private Runnable initializer;
+private Consumer initializer;
 @Mock
 private KafkaProducer producer;
-private MockConsumer consumer;
-@Mock

Review Comment:
   `admin` field needs to be mocked only in methods where previously 
`setupWithAdmin` was used, if we mock it everywhere then some tests don't pass 
because they expect that `admin` is `null`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13860: KAFKA-15093: Add 3.4 and 3.5 Streams upgrade system tests

2023-08-08 Thread via GitHub


mimaison commented on PR #13860:
URL: https://github.com/apache/kafka/pull/13860#issuecomment-1669737299

   This depends on https://github.com/apache/kafka/pull/14103 so we need to get 
that merged first


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-08-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15100:
---
Fix Version/s: 3.6.0

> Unsafe to call tryCompleteFetchResponse on request timeout
> --
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> When the fetch request times out the future is completed from the 
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
> tryCompleteFetchResponse is always called from the same thread. This 
> invariant is violated in this case.
> {code:java}
>            return future.handle((completionTimeMs, exception) -> {
>               if (exception != null) {
>                   Throwable cause = exception instanceof ExecutionException ?
>                       exception.getCause() : exception;                  // 
> If the fetch timed out in purgatory, it means no new data is available,
>                   // and we will complete the fetch successfully. Otherwise, 
> if there was
>                   // any other error, we need to return it.
>                   Errors error = Errors.forException(cause);
>                   if (error != Errors.REQUEST_TIMED_OUT) {
>                       logger.info("Failed to handle fetch from {} at {} due 
> to {}",
>                           replicaId, fetchPartition.fetchOffset(), error);
>                       return buildEmptyFetchResponse(error, Optional.empty());
>                   }
>               }              // FIXME: `completionTimeMs`, which can be null
>               logger.trace("Completing delayed fetch from {} starting at 
> offset {} at {}",
>                   replicaId, fetchPartition.fetchOffset(), completionTimeMs);
>               return tryCompleteFetchRequest(replicaId, fetchPartition, 
> time.milliseconds());
>           });
> {code}
> One solution is to always build an empty response if the future was completed 
> exceptionally. This works because the ExpirationService completes the future 
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This 
> would be a service that allows more kinds of event to get scheduled/submitted 
> to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-08 Thread via GitHub


clolov commented on PR #14161:
URL: https://github.com/apache/kafka/pull/14161#issuecomment-1669704586

   Politely requesting a review @showuon @divijvaidya @satishd!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-08 Thread via GitHub


clolov commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1287188953


##
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##
@@ -154,6 +155,92 @@ class KafkaServerTest extends QuorumTestHarness {
 server.shutdown()
   }
 
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit 
= {
+val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+  
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+  "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+server.remoteLogManagerOpt match {
+  case Some(_) =>
+  case None => fail("RemoteLogManager should be initialized")
+}
+
+val topicProps = new Properties()
+topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)

Review Comment:
   I do not know why there isn't a TestUtils.createTopicConfigs ¯\\\_(ツ)\_/¯
   
   Happy to add it if others think it might be useful!



##
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##
@@ -154,6 +155,92 @@ class KafkaServerTest extends QuorumTestHarness {
 server.shutdown()
   }
 
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit 
= {
+val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+  
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+  "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+server.remoteLogManagerOpt match {
+  case Some(_) =>
+  case None => fail("RemoteLogManager should be initialized")
+}
+
+val topicProps = new Properties()
+topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)

Review Comment:
   I do not know why there isn't a TestUtils.createTopicConfigs ¯\\\_(ツ)\_/¯
   
   Happy to add it if others think it might be useful!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15315) Use getOrDefault rather than get

2023-08-08 Thread roon (Jira)
roon created KAFKA-15315:


 Summary: Use getOrDefault rather than get
 Key: KAFKA-15315
 URL: https://issues.apache.org/jira/browse/KAFKA-15315
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: roon






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-08 Thread via GitHub


clolov commented on PR #14127:
URL: https://github.com/apache/kafka/pull/14127#issuecomment-1669607117

   Apologies for joining the review late (I had to spend a few hours getting 
acquainted with the ConsumerTask code). I carried out a first pass on the 
changes to the main files and they make sense to me. I will circle back 
tomorrow morning and have another go through the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-08-08 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1287096186


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,52 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenApply(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+}

Review Comment:
   That's a great catch. I need to catch it to keep the compiler happy(get() 
throws a couple of exceptions). Actually I realised my tests were doing the 
asserts incorrectly. I have made further changes of the form =>
   
   1. Replace `thenApply` with `thenAccept` when chaining CompletableFutures. 
This makes sense since we don't really need to do any transformations but just 
perform an action (setting to primary/secondary store).
   2. I now return the CompletableFuture object directly and invoke the passed 
callback's onComplete at appropriate places. Also note that for error cases, it 
doesn't seem to be possible to throw checked exceptions from any of these 
chaining methods like `thenAccept` etc. We need to wrap them in a 
RuntimeException and handle them via `exceptionally` or `whenComplete`. I 
haven't chosen either of the 2 approaches since eventually the errors need to 
reflect on the callback.
   3. Made changes to the tests where now I assert errors and results directly 
within the callback. I believe this is closer to the actual changes.
   
   Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-08 Thread via GitHub


yashmayya commented on code in PR #14153:
URL: https://github.com/apache/kafka/pull/14153#discussion_r1287038386


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -103,7 +103,7 @@ public class KafkaBasedLog {
 private Optional> producer;
 private TopicAdmin admin;
 
-private Thread thread;
+Thread thread;

Review Comment:
   ```suggestion
   // Visible for testing
   Thread thread;
   ```
   nit



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -114,29 +113,49 @@ public class KafkaBasedLogTest {
 private static final String TP0_VALUE_NEW = "VAL0_NEW";
 private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-private Time time = new MockTime();
-private KafkaBasedLog store;
+private final Time time = new MockTime();
+private MockedKafkaBasedLog store;
 
 @Mock
-private Runnable initializer;
+private Consumer initializer;
 @Mock
 private KafkaProducer producer;
-private MockConsumer consumer;
-@Mock

Review Comment:
   I'm curious, why was this change required - 
https://github.com/apache/kafka/pull/14153/commits/2c7d2e380d02d0f7dc77944ca5775ddaa3540457?



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -114,29 +113,49 @@ public class KafkaBasedLogTest {
 private static final String TP0_VALUE_NEW = "VAL0_NEW";
 private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-private Time time = new MockTime();
-private KafkaBasedLog store;
+private final Time time = new MockTime();
+private MockedKafkaBasedLog store;
 
 @Mock
-private Runnable initializer;
+private Consumer initializer;
 @Mock
 private KafkaProducer producer;
-private MockConsumer consumer;
-@Mock
 private TopicAdmin admin;
+private final Supplier topicAdminSupplier = () -> admin;
+private MockConsumer consumer;
 
-private Map>> 
consumedRecords = new HashMap<>();
-private Callback> consumedCallback = 
(error, record) -> {
+private final Map>> 
consumedRecords = new HashMap<>();
+private final Callback> consumedCallback = 
(error, record) -> {
 TopicPartition partition = new TopicPartition(record.topic(), 
record.partition());
 List> records = 
consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>());
 records.add(record);
 };
 
-@SuppressWarnings("unchecked")
+private class MockedKafkaBasedLog extends KafkaBasedLog {
+public MockedKafkaBasedLog(String topic,
+   Map producerConfigs,
+   Map consumerConfigs,
+   Supplier topicAdminSupplier,
+   Callback> 
consumedCallback,
+   Time time,
+   Consumer initializer) {
+super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, 
consumedCallback, time, initializer);
+}
+
+@Override
+protected KafkaProducer createProducer() {
+return producer;
+}
+
+@Override
+protected MockConsumer createConsumer() {
+return consumer;
+}
+}

Review Comment:
   Can we use an anonymous inner class in the `setUp` method instead? I think 
that'll look a lot cleaner.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -376,22 +358,11 @@ public void testPollConsumerError() throws Exception {
 
 store.stop();
 
-assertFalse(Whitebox.getInternalState(store, 
"thread").isAlive());
-assertTrue(consumer.closed());
-PowerMock.verifyAll();
+verifyStartAndStop();
 }
 
 @Test
 public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
-expectStart();
-
-// Producer flushes when read to log end is called

Review Comment:
   Same comment as above 
(https://github.com/apache/kafka/pull/14153/files#r1286903819)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15314) No Quota applied if client-id is null or empty

2023-08-08 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-15314:


 Summary: No Quota applied if client-id is null or empty
 Key: KAFKA-15314
 URL: https://issues.apache.org/jira/browse/KAFKA-15314
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jorge Esteban Quilcate Otoya


When Quotas where proposed, KIP-13[1] stated:

>  In addition, there will be a quota reserved for clients not presenting a 
>client id (for e.g. simple consumers not setting the id). This will default to 
>an empty client id ("") and all such clients will share the quota for that 
>empty id (which should be the default quota).

Though, seems that when client-id is null or empty and a default quota for 
client-id is present, no quota is applied.

Even though Java clients set a default value [2][3], the protocol accepts null 
client-id[4], and other clients implementations could send a null value to 
by-pass a quota.

Related code[5][6] shows that preparing metric pair for quotas with client-id 
(potentially null) and setting quota to null when both client-id and (sanitize) 
user are null.

Adding some tests to showcase this: 
[https://github.com/apache/kafka/pull/14165] 

 

Is it expected for client-id=null to by-pass quotas? If it is, then KIP or 
documentation to clarify this; otherwise we should amend this behavior bug. e.g 
we could "sanitize" client-id similar to user name to be empty string when 
input is null or empty.


 

As a side-note, similar behavior could happen with user I guess. Even though 
value is default to ANONYMOUS, if a client implementation sends empty value, it 
may as well by-pass the default quota – though I need to further test this once 
this is considered a bug.

 

[1]: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas] 


[2]: 
[https://github.com/apache/kafka/blob/e98508747acc8972ac5ceb921e0fd3a7d7bd5e9c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L498-L508]
 


[3]: 
[https://github.com/apache/kafka/blob/ab71c56973518bac8e1868eccdc40b17d7da35c1/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L616-L628]

[4]: 
[https://github.com/apache/kafka/blob/9f26906fcc2fd095b7d27c504e342b9a8d619b4b/clients/src/main/resources/common/message/RequestHeader.json#L34-L40]
 


[5]: 
[https://github.com/apache/kafka/blob/322ac86ba282f35373382854cc9e790e4b7fb5fc/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L588-L628]
 


[6]: 
[https://github.com/apache/kafka/blob/322ac86ba282f35373382854cc9e790e4b7fb5fc/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L651-L652]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on a diff in pull request #14142: KAFKA-7438: Replace EasyMock with Mockito & remove misleading tests in SessionSto…

2023-08-08 Thread via GitHub


yashmayya commented on code in PR #14142:
URL: https://github.com/apache/kafka/pull/14142#discussion_r1287021377


##
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java:
##
@@ -118,48 +114,28 @@ public void 
shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
 }
 
 @Test
-public void shouldThrowNullPointerIfInnerIsNull() {
+public void shouldThrowNullPointerIfSupplierIsNull() {
 final Exception e = assertThrows(NullPointerException.class, () -> new 
SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()));
 assertThat(e.getMessage(), equalTo("storeSupplier cannot be null"));
 }
 
 @Test
-public void shouldThrowNullPointerIfKeySerdeIsNull() {
-final Exception e = assertThrows(NullPointerException.class, () -> new 
SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()));
-assertThat(e.getMessage(), equalTo("name cannot be null"));
-}
-
-@Test
-public void shouldThrowNullPointerIfValueSerdeIsNull() {
-final Exception e = assertThrows(NullPointerException.class, () -> new 
SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()));

Review Comment:
   Ah, this looks like the same issue as 
https://github.com/apache/kafka/pull/14152#discussion_r1284245654



##
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java:
##
@@ -118,48 +114,28 @@ public void 
shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
 }
 
 @Test
-public void shouldThrowNullPointerIfInnerIsNull() {
+public void shouldThrowNullPointerIfSupplierIsNull() {

Review Comment:
   ```suggestion
   public void shouldThrowNullPointerIfStoreSupplierIsNull() {
   ```
   nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >