[jira] [Assigned] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2023-07-31 Thread Owen C.H. Leung (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Owen C.H. Leung reassigned KAFKA-15038:
---

Assignee: Owen C.H. Leung

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Owen C.H. Leung
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2023-07-31 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058
 ] 

Owen C.H. Leung commented on KAFKA-15038:
-

[~divijvaidya] I'd like to pick this up. I've done a bit diving and would like 
to clarify if my understanding is correct : 

So essentially, in this ticket we want to remove the use of 
`ConcurrentMap topicPartitionIds`, and leverage the cache 
available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use 
metadata manager in the `Map remoteLogMetadataManagerProps()` ) 
to serve as the cache, which will be the single source of authority ? 

Let me know. Thanks

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Owen C.H. Leung
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2023-07-31 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058
 ] 

Owen C.H. Leung edited comment on KAFKA-15038 at 7/31/23 7:13 AM:
--

[~divijvaidya] I'd like to pick this up. I've done a bit diving and would like 
to clarify if my understanding is correct : 

So essentially, in this ticket we want to remove the use of 
```ConcurrentMap topicPartitionIds```, and leverage the 
cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use 
metadata manager in the `Map remoteLogMetadataManagerProps()` ) 
to serve as the cache, which will be the single source of authority ? 

Let me know. Thanks


was (Author: JIRAUSER300460):
[~divijvaidya] I'd like to pick this up. I've done a bit diving and would like 
to clarify if my understanding is correct : 

So essentially, in this ticket we want to remove the use of 
`ConcurrentMap topicPartitionIds`, and leverage the cache 
available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use 
metadata manager in the `Map remoteLogMetadataManagerProps()` ) 
to serve as the cache, which will be the single source of authority ? 

Let me know. Thanks

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Owen C.H. Leung
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2023-07-31 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058
 ] 

Owen C.H. Leung edited comment on KAFKA-15038 at 7/31/23 7:14 AM:
--

[~divijvaidya] I'd like to pick this up. I've done a bit diving and would like 
to clarify if my understanding is correct : 

So essentially, in this ticket we want to remove the use of 
??ConcurrentMap topicPartitionIds??, and leverage the 
cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use 
metadata manager in the `Map remoteLogMetadataManagerProps()` ) 
to serve as the cache, which will be the single source of authority ? 

Let me know. Thanks


was (Author: JIRAUSER300460):
[~divijvaidya] I'd like to pick this up. I've done a bit diving and would like 
to clarify if my understanding is correct : 

So essentially, in this ticket we want to remove the use of 
```ConcurrentMap topicPartitionIds```, and leverage the 
cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use 
metadata manager in the `Map remoteLogMetadataManagerProps()` ) 
to serve as the cache, which will be the single source of authority ? 

Let me know. Thanks

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Owen C.H. Leung
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2023-07-31 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058
 ] 

Owen C.H. Leung edited comment on KAFKA-15038 at 7/31/23 7:15 AM:
--

[~divijvaidya] I'd like to pick this up. I've done a bit diving and would like 
to clarify if my understanding is correct : 

So essentially, in this ticket we want to remove the use of 
{*}ConcurrentMap topicPartitionIds{*}, and leverage the 
cache available in *RemoteLogManagerConfig rlmConfig* (to be more specific, use 
metadata manager in the *Map remoteLogMetadataManagerProps()* ) 
to serve as the cache, which will be the single source of authority ? 

Let me know. Thanks


was (Author: JIRAUSER300460):
[~divijvaidya] I'd like to pick this up. I've done a bit diving and would like 
to clarify if my understanding is correct : 

So essentially, in this ticket we want to remove the use of 
??ConcurrentMap topicPartitionIds??, and leverage the 
cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use 
metadata manager in the `Map remoteLogMetadataManagerProps()` ) 
to serve as the cache, which will be the single source of authority ? 

Let me know. Thanks

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Owen C.H. Leung
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs

2023-07-31 Thread via GitHub


showuon commented on code in PR #14114:
URL: https://github.com/apache/kafka/pull/14114#discussion_r1278885944


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -713,6 +714,99 @@ class DynamicBrokerConfigTest {
 config.updateCurrentConfig(new KafkaConfig(props))
 
assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp))
   }
+
+  @Test
+  def testDynamicLogLocalRetentionMsConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+props.put(KafkaConfig.LogRetentionTimeMillisProp, "259200")
+val config = KafkaConfig(props)
+val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+config.dynamicConfig.initialize(None)
+config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
+
+val newProps = new Properties()
+newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, 
"216000")
+// update default config
+config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+config.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(216000L, config.logLocalRetentionMs)
+
+// update per broker config
+config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, 
"215000")
+config.dynamicConfig.updateBrokerConfig(0, newProps)
+assertEquals(215000L, config.logLocalRetentionMs)
+  }
+
+  @Test
+  def testDynamicLogLocalRetentionSizeConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+props.put(KafkaConfig.LogRetentionBytesProp, "4294967296")
+val config = KafkaConfig(props)
+val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+config.dynamicConfig.initialize(None)
+config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
+
+val newProps = new Properties()
+newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, 
"4294967295")
+// update default config
+config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+config.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(4294967295L, config.logLocalRetentionBytes)
+
+// update per broker config
+config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, 
"4294967294")
+config.dynamicConfig.updateBrokerConfig(0, newProps)
+assertEquals(4294967294L, config.logLocalRetentionBytes)
+  }
+
+  @Test
+  def testDynamicLogLocalRetentionSkipsOnInvalidConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
+props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
+val config = KafkaConfig(props)
+config.dynamicConfig.initialize(None)
+
+// Check for invalid localRetentionMs < -2
+verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, 
Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3"))
+// Check for invalid localRetentionBytes < -2
+verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, 
Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP -> "-3"))
+  }
+
+  @Test
+  def testDynamicLogLocalRetentionThrowsOnIncorrectConfig(): Unit = {
+// Check for incorrect case of logLocalRetentionMs > retentionMs
+verifyIncorrectLogLocalRetentionProps(2000L, 2, 100, 1000L)
+// Check for incorrect case of logLocalRetentionBytes > retentionBytes
+verifyIncorrectLogLocalRetentionProps(500L, 200, 100, 1000L)
+// Check for incorrect case of logLocalRetentionMs (-1 viz unlimited) > 
retentionMs,
+verifyIncorrectLogLocalRetentionProps(-1, 200, 100, 1000L)
+// Check for incorrect case of logLocalRetentionBytes(-1 viz unlimited) > 
retentionBytes
+verifyIncorrectLogLocalRetentionProps(2000L, -1, 100, 1000L)
+  }
+
+  def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
+logLocalRetentionBytes: Long,
+retentionBytes: Long,
+retentionMs: Long): Unit = {

Review Comment:
   nit: It's hard to review this validation method with current parameter 
order. Could you put the same type of config in pair? ex:
   ```
def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,  
retentionMs: Long,  logLocalRetentionBytes: Long, retentionBytes: Long) 
   ```



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1784,4 +1788,22 @@ class KafkaConfigTest {
 props.put(KafkaConfig.ConsumerGroupHeartbeatIntervalMsProp, "25")
 assertThrows(classOf[IllegalArgumentException], ()

[GitHub] [kafka] divijvaidya commented on pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-07-31 Thread via GitHub


divijvaidya commented on PR #14078:
URL: https://github.com/apache/kafka/pull/14078#issuecomment-1657889077

   Hi Fei
   
   CI tests are re-triggered on new commits or when a committer restarts them. 
Those are the only two options right now.
   Regarding other flaky tests, if you run `./gradlew unitTest` and `./gradlew 
integrationTest` locally, they should pass. You can mention in the description 
that these tests pass locally for you and after that the person who will merge 
can determine if the flaky tests in CI are related to your change or not prior 
to merging. You can also look at JIRA tickets and usually the existing flaky 
tests have an open JIRA ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-07-31 Thread via GitHub


divijvaidya commented on PR #14078:
URL: https://github.com/apache/kafka/pull/14078#issuecomment-1657889903

   I have just re-triggered CI tests for this PR at 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14078/2/pipeline


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-07-31 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1278963024


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -518,7 +529,7 @@ else if (!future.isRetriable())
 return false;
 }
 
-timer.sleep(rebalanceConfig.retryBackoffMs);
+timer.sleep(retryBackoff.backoff(attempts++));

Review Comment:
   Yes, true. Good point. I'll revert the code for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


kamalcph commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1278963394


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws 
RemoteStorageException
 }
 }
 
+List enrichedLogSegments(UnifiedLog log, Long 
fromOffset, Long lastStableOffset) {
+List enrichedLogSegments = new ArrayList<>();
+List segments = 
JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq());
+if (!segments.isEmpty()) {
+int idx = 1;
+for (; idx < segments.size(); idx++) {
+LogSegment previous = segments.get(idx - 1);
+LogSegment current = segments.get(idx);
+enrichedLogSegments.add(new EnrichedLogSegment(previous, 
current.baseOffset()));
+}
+// LogSegment#readNextOffset() is an expensive call, so we 
only call it when necessary.
+int lastIdx = idx - 1;
+if (segments.get(lastIdx).baseOffset() < lastStableOffset) {
+LogSegment last = segments.get(lastIdx);
+enrichedLogSegments.add(new EnrichedLogSegment(last, 
last.readNextOffset()));
+}

Review Comment:
   For a given 
[LogSegment](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala),
 we know about start-offset (base-offset) but not the end offset. 
`readNextOffset` denotes `end-offset-of-that-segment` + 1.
   
   To exclude the active segments, we are using `log.nonActiveLogSegmentsFrom`. 
With this, we cannot use the active-segment-base-offset as the operations are 
not done atomically. (the active segment might gets rotated in the mean time).
   
   If we want to avoid `LogSegment#nextReadOffset` operation altogether, then 
we can list all the segments including the active and discard/filter the final 
entry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


kamalcph commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1278963394


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws 
RemoteStorageException
 }
 }
 
+List enrichedLogSegments(UnifiedLog log, Long 
fromOffset, Long lastStableOffset) {
+List enrichedLogSegments = new ArrayList<>();
+List segments = 
JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq());
+if (!segments.isEmpty()) {
+int idx = 1;
+for (; idx < segments.size(); idx++) {
+LogSegment previous = segments.get(idx - 1);
+LogSegment current = segments.get(idx);
+enrichedLogSegments.add(new EnrichedLogSegment(previous, 
current.baseOffset()));
+}
+// LogSegment#readNextOffset() is an expensive call, so we 
only call it when necessary.
+int lastIdx = idx - 1;
+if (segments.get(lastIdx).baseOffset() < lastStableOffset) {
+LogSegment last = segments.get(lastIdx);
+enrichedLogSegments.add(new EnrichedLogSegment(last, 
last.readNextOffset()));
+}

Review Comment:
   For a given 
[LogSegment](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala),
 we know about start-offset (base-offset) but not the end offset. 
`readNextOffset` denotes `end-offset-of-that-segment` + 1.
   
   To exclude the active segment, we are using `log.nonActiveLogSegmentsFrom`. 
With this, we cannot use the active-segment-base-offset as the operations are 
not done atomically. (the active segment might gets rotated in the mean time).
   
   If we want to avoid `LogSegment#nextReadOffset` operation altogether, then 
we can list all the segments including the active and discard/filter the final 
entry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14110: MINOR: Add test for describe topic with ID

2023-07-31 Thread via GitHub


divijvaidya commented on code in PR #14110:
URL: https://github.com/apache/kafka/pull/14110#discussion_r1278956423


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception {
 }
 }
 
+@Test
+public void testDescribeTopicsByIds() {
+try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
+AdminClientConfig.RETRIES_CONFIG, "2")) {

Review Comment:
   Just use `try (AdminClientUnitTestEnv env = mockClientEnv()) {` maybe? 
Setting cluster to 4 nodes and 2 retries does't sound relevant to this test



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception {
 }
 }
 
+@Test
+public void testDescribeTopicsByIds() {
+try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
+AdminClientConfig.RETRIES_CONFIG, "2")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Valid ID

Review Comment:
   we have added a case for valid Id, another for unrepresentable id and 
perhaps we have have one more for id which does not exist in the broker? Can 
you please add the third case?



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception {
 }
 }
 
+@Test
+public void testDescribeTopicsByIds() {
+try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
+AdminClientConfig.RETRIES_CONFIG, "2")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Valid ID
+Uuid topicId = Uuid.randomUuid();
+String topicName = "test-topic";
+Node leader = env.cluster().nodes().get(0);
+MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+new TopicPartition(topicName, 0),
+Optional.of(leader.id()),
+Optional.of(10),
+singletonList(leader.id()),
+singletonList(leader.id()),
+singletonList(leader.id()));
+env.kafkaClient().prepareResponse(RequestTestUtils
+.metadataResponse(
+env.cluster().nodes(),
+env.cluster().clusterResource().clusterId(),
+env.cluster().controller().id(),
+singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false,
+singletonList(partitionMetadata), 
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED;
+TopicCollection.TopicIdCollection topicIds = 
TopicCollection.ofTopicIds(
+singletonList(topicId));
+try {
+DescribeTopicsResult result = env.adminClient().describeTopics(
+TopicCollection.ofTopicIds(singletonList(topicId)));
+Map allTopicIds = 
result.allTopicIds().get();
+assertEquals(topicName, allTopicIds.get(topicId).name());
+} catch (Exception e) {
+fail("describe with valid topicId should not fail", e);
+}
+
+// Invalid ID
+try {
+DescribeTopicsResult result = env.adminClient().describeTopics(
+
TopicCollection.ofTopicIds(singletonList(Uuid.ZERO_UUID)));
+result.allTopicIds().get();

Review Comment:
   you can also choose to use `TestUtils.assertFutureError`



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception {
 }
 }
 
+@Test
+public void testDescribeTopicsByIds() {
+try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
+AdminClientConfig.RETRIES_CONFIG, "2")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Valid ID
+Uuid topicId = Uuid.randomUuid();
+String topicName = "test-topic";
+Node leader = env.cluster().nodes().get(0);
+MetadataResponse.PartitionMetadata partitionMetadata = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+new TopicPartition(topicName, 0),
+Optional.of(leader.id()),
+Optional.of(10),
+singletonList(leader.id()),
+singletonList(leader.id()),
+singlet

[GitHub] [kafka] showuon commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


showuon commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1278970321


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws 
RemoteStorageException
 }
 }
 
+List enrichedLogSegments(UnifiedLog log, Long 
fromOffset, Long lastStableOffset) {
+List enrichedLogSegments = new ArrayList<>();
+List segments = 
JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq());
+if (!segments.isEmpty()) {
+int idx = 1;
+for (; idx < segments.size(); idx++) {
+LogSegment previous = segments.get(idx - 1);
+LogSegment current = segments.get(idx);
+enrichedLogSegments.add(new EnrichedLogSegment(previous, 
current.baseOffset()));
+}
+// LogSegment#readNextOffset() is an expensive call, so we 
only call it when necessary.
+int lastIdx = idx - 1;
+if (segments.get(lastIdx).baseOffset() < lastStableOffset) {
+LogSegment last = segments.get(lastIdx);
+enrichedLogSegments.add(new EnrichedLogSegment(last, 
last.readNextOffset()));
+}

Review Comment:
   > If we want to avoid LogSegment#nextReadOffset operation altogether, then 
we can list all the segments including the active and discard/filter the final 
entry.
   
   Yes, that's my thought, given the `LogSegment#nextReadOffset` is expensive. 
Any drawbacks for this solution?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-07-31 Thread via GitHub


yashmayya commented on PR #14102:
URL: https://github.com/apache/kafka/pull/14102#issuecomment-1657931548

   Hm I just noticed that the `DistributedHerderTest` is failing on the CI run 
(only for JDK 11) with:
   
   ```
   org.mockito.exceptions.base.MockitoException: Unable to create mock instance 
of type 'DistributedHerder'
at 
app//org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.setUp(DistributedHerderTest.java:300)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
at 
app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
app//org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33)
at 
app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
app//org.mockito.internal.runners.DefaultInternalRunner$1$1.evaluate(DefaultInternalRunner.java:55)
at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at 
app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
app//org.mockito.internal.runners.DefaultInternalRunner$1.run(DefaultInternalRunner.java:100)
at 
app//org.mockito.internal.runners.DefaultInternalRunner.run(DefaultInternalRunner.java:107)
at 
app//org.mockito.internal.runners.StrictRunner.run(StrictRunner.java:41)
at 
app//org.mockito.junit.MockitoJUnitRunner.run(MockitoJUnitRunner.java:163)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source)
at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at 
org.gradle.process.internal.worker.child.SystemApplicationClass

[GitHub] [kafka] kamalcph commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs

2023-07-31 Thread via GitHub


kamalcph commented on code in PR #14114:
URL: https://github.com/apache/kafka/pull/14114#discussion_r1278982175


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -424,6 +387,18 @@ public int initFileSize() {
 return 0;
 }
 
+public boolean remoteStorageEnable() {
+return remoteLogConfig.remoteStorageEnable;
+}
+
+public long localRetentionMs() {
+return remoteLogConfig.localRetentionMs == 
LogConfig.DEFAULT_LOCAL_RETENTION_MS ? retentionMs : 
remoteLogConfig.localRetentionMs;
+}
+
+public long localRetentionBytes() {
+return remoteLogConfig.localRetentionBytes == 
LogConfig.DEFAULT_LOCAL_RETENTION_BYTES ? retentionSize : 
remoteLogConfig.localRetentionBytes;

Review Comment:
   yes, correct. The caller should get the real local log retention values when 
set to -2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8128) Dynamic delegation token change possibility for consumer/producer

2023-07-31 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749087#comment-17749087
 ] 

Gabor Somogyi commented on KAFKA-8128:
--

It took some time to remember what was the original issue since the jira 
reporter not written it down clearly😅
Now I remember that JAAS context is definitely not re-loaded which is the root 
cause. As I remember there is a manual JAAS context reload possibility in the 
JVM but it didn't have effect on the producer/consumer. The last statement may 
or may not be valid since it was 4 years ago...

All in all it would be good to have an explicit API to change the token which 
would solve many issues for sure.

> Dynamic delegation token change possibility for consumer/producer
> -
>
> Key: KAFKA-8128
> URL: https://issues.apache.org/jira/browse/KAFKA-8128
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.2.0
>Reporter: Gabor Somogyi
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Re-authentication feature on broker side is under implementation which will 
> enforce consumer/producer instances to re-authenticate time to time. It would 
> be good to set the latest delegation token dynamically and not re-creating 
> consumer/producer instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage

2023-07-31 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15195:
---
Priority: Major  (was: Blocker)

> Regenerate segment-aligned producer snapshots when upgrading to a Kafka 
> version supporting Tiered Storage
> -
>
> Key: KAFKA-15195
> URL: https://issues.apache.org/jira/browse/KAFKA-15195
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
> Fix For: 3.6.0
>
>
> As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to 
> upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will 
> have to wait for retention to clean up segments without an associated 
> producer snapshot.
> However, in our experience, customers of Kafka expect to be able to 
> immediately enable tiering on a topic once their cluster upgrade is complete. 
> Once they do this, however, they start seeing NPEs and no data is uploaded to 
> Tiered Storage 
> (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61).
> To achieve this, we propose changing Kafka to retroactively create producer 
> snapshot files on upload whenever a segment is due to be archived and lacks 
> one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage

2023-07-31 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749088#comment-17749088
 ] 

Satish Duggana commented on KAFKA-15195:


It is not a blocker for 3.6.0 as KIP-405 talks about supporting it from 2.8.0 
or later releases.

> Regenerate segment-aligned producer snapshots when upgrading to a Kafka 
> version supporting Tiered Storage
> -
>
> Key: KAFKA-15195
> URL: https://issues.apache.org/jira/browse/KAFKA-15195
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
> Fix For: 3.6.0
>
>
> As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to 
> upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will 
> have to wait for retention to clean up segments without an associated 
> producer snapshot.
> However, in our experience, customers of Kafka expect to be able to 
> immediately enable tiering on a topic once their cluster upgrade is complete. 
> Once they do this, however, they start seeing NPEs and no data is uploaded to 
> Tiered Storage 
> (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61).
> To achieve this, we propose changing Kafka to retroactively create producer 
> snapshot files on upload whenever a segment is due to be archived and lacks 
> one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-07-31 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749090#comment-17749090
 ] 

Satish Duggana commented on KAFKA-14972:


[~erikvanoosten] I am removing this KIP from 3.6.0 release plan for now as it 
is not accepted. Please correct me if I am missing something here.

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See 
> [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
> [https://github.com/apache/kafka/pull/13914] for an implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-07-31 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749092#comment-17749092
 ] 

Satish Duggana commented on KAFKA-14972:


[~erikvanoosten] I am removing this from 3.6.0 release plan as the respective 
KIP is not yet accepted. 

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See 
> [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
> [https://github.com/apache/kafka/pull/13914] for an implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-07-31 Thread Satish Duggana (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14972 ]


Satish Duggana deleted comment on KAFKA-14972:


was (Author: satish.duggana):
[~erikvanoosten] I am removing this from 3.6.0 release plan as the respective 
KIP is not yet accepted. 

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See 
> [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
> [https://github.com/apache/kafka/pull/13914] for an implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15248) Add BooleanConverter to Kafka Connect

2023-07-31 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749094#comment-17749094
 ] 

Satish Duggana commented on KAFKA-15248:


[~hgeraldino] Removing from 3.6.0 release plan as the kip is not yet approved. 
Please let me know if I am missing some thing here.

> Add BooleanConverter to Kafka Connect
> -
>
> Key: KAFKA-15248
> URL: https://issues.apache.org/jira/browse/KAFKA-15248
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> KIP-959: Add BooleanConverter to Kafka Connect -> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-959%3A+Add+BooleanConverter+to+Kafka+Connect



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon opened a new pull request, #14133: KAFKA-15189: only init remote topic metrics when enabled

2023-07-31 Thread via GitHub


showuon opened a new pull request, #14133:
URL: https://github.com/apache/kafka/pull/14133

   Only initialize remote topic metrics when system-wise remote storage is 
enabled to avoid impacting performance for existing brokers. Also add tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14133: KAFKA-15189: only init remote topic metrics when enabled

2023-07-31 Thread via GitHub


showuon commented on code in PR #14133:
URL: https://github.com/apache/kafka/pull/14133#discussion_r1279033437


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -127,7 +127,7 @@ public class RemoteLogManagerTest {
 RemoteStorageManager remoteStorageManager = 
mock(RemoteStorageManager.class);
 RemoteLogMetadataManager remoteLogMetadataManager = 
mock(RemoteLogMetadataManager.class);
 RemoteLogManagerConfig remoteLogManagerConfig = null;
-BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+BrokerTopicStats brokerTopicStats = new BrokerTopicStats(true);

Review Comment:
   force to true since we're testing `RemoteLogManager`



##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -44,7 +44,7 @@
 public class RemoteLogReaderTest {
 public static final String TOPIC = "test";
 RemoteLogManager mockRLM = mock(RemoteLogManager.class);
-BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+BrokerTopicStats brokerTopicStats = new BrokerTopicStats(true);

Review Comment:
   force to true since we're testing `RemoteLogReader`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14133: KAFKA-15189: only init remote topic metrics when enabled

2023-07-31 Thread via GitHub


showuon commented on code in PR #14133:
URL: https://github.com/apache/kafka/pull/14133#discussion_r1279034999


##
core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java:
##
@@ -179,6 +179,7 @@ public ReplicaManager build() {
 if (metadataCache == null) throw new RuntimeException("You must set 
metadataCache");
 if (logDirFailureChannel == null) throw new RuntimeException("You must 
set logDirFailureChannel");
 if (alterPartitionManager == null) throw new RuntimeException("You 
must set alterIsrManager");
+if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());

Review Comment:
   Initializing brokerTopicStats if null, with the initialized config in L176 
above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14133: KAFKA-15189: only init remote topic metrics when enabled

2023-07-31 Thread via GitHub


showuon commented on PR #14133:
URL: https://github.com/apache/kafka/pull/14133#issuecomment-1658003702

   @kamalcph  @divijvaidya , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-07-31 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1279051318


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -203,18 +224,43 @@ public class CommonClientConfigs {
  * @return  The new values which have been set as 
described in postProcessParsedConfig.
  */
 public static Map 
postProcessReconnectBackoffConfigs(AbstractConfig config,
-Map 
parsedValues) {
+ 
Map parsedValues) {
 HashMap rval = new HashMap<>();
 Map originalConfig = config.originals();
 if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
 originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
-log.debug("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
+log.warn("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
 RECONNECT_BACKOFF_MS_CONFIG, 
RECONNECT_BACKOFF_MAX_MS_CONFIG);
 rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, 
parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
 }
 return rval;
 }
 
+/**
+ * Log warning if the exponential backoff is disabled due to initial 
backoff value is greater than max backoff value.
+ *
+ * @param configThe config object.
+ */
+public static void warnDisablingExponentialBackoff(AbstractConfig config) {
+long retryBackoffMs = config.getLong(RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = config.getLong(RETRY_BACKOFF_MAX_MS_CONFIG);
+if (retryBackoffMs > retryBackoffMaxMs) {
+log.warn("Configuration '{}' with value '{}' is greater than 
configuration '{}' with value '{}'. " +
+"A static backoff with value '{}' will be applied.",
+RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
+RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs, 
retryBackoffMs);

Review Comment:
   Digging into this, I discovered that KIP-601 was not quite implemented 
correctly and the backoff value could exceed the maximum either because of 
jitter, or because of a lower value for the max config. I'd just followed the 
same path for KIP-580, which is not correct.
   
   I have fixed the exponential backoff code for both of these cases so that 
the maximum is strictly enforced.
   
   Personally, I think that it's preferable if it is possible for the maximum 
to be exceeded because of jitter, but that is not what the KIP says.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-31 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen updated KAFKA-15264:
-
Attachment: image-2023-07-31-18-04-54-112.png

> Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
> ---
>
> Key: KAFKA-15264
> URL: https://issues.apache.org/jira/browse/KAFKA-15264
> Project: Kafka
>  Issue Type: Bug
>Reporter: jianbin.chen
>Priority: Major
> Attachments: image-2023-07-28-09-51-01-662.png, 
> image-2023-07-28-09-52-38-941.png, image-2023-07-31-18-04-54-112.png, 
> image-2023-07-31-18-05-21-772.png
>
>
> I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster 
> deployment), and when I recently compared and tested, I found that when using 
> the following stress test command, the throughput gap is obvious
>  
> {code:java}
> ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 
> --record-size 1024 --throughput -1 --producer-props 
> bootstrap.servers=xxx: acks=1
> 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg 
> latency, 588.0 ms max latency.
> 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg 
> latency, 460.0 ms max latency.
> 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg 
> latency, 1120.0 ms max latency.
> 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg 
> latency, 1097.0 ms max latency.
> 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg 
> latency, 610.0 ms max latency.
> 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg 
> latency, 1892.0 ms max latency.
> 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg 
> latency, 3000.0 ms max latency.
> 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg 
> latency, 1781.0 ms max latency.
> 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg 
> latency, 2590.0 ms max latency.
> 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg 
> latency, 1463.0 ms max latency.
> 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg 
> latency, 2362.0 ms max latency.
> 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg 
> latency, 2986.0 ms max latency.
> 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg 
> latency, 2958.0 ms max latency.
> 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg 
> latency, 1959.0 ms max latency.
> 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg 
> latency, 1995.0 ms max latency.
> 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg 
> latency, 1496.0 ms max latency.
> 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg 
> latency, 2446.0 ms max latency.
> 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg 
> latency, 2715.0 ms max latency.
> 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg 
> latency, 2702.0 ms max latency.
> 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg 
> latency, 1846.0 ms max latency.
> 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg 
> latency, 1414.0 ms max latency.
> 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg 
> latency, 1897.0 ms max latency.
> 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg 
> latency, 1601.0 ms max latency.
> 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg 
> latency, 1563.0 ms max latency.
> 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg 
> latency, 1975.0 ms max latency.
> 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg 
> latency, 1753.0 ms max latency.
> 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg 
> latency, 1833.0 ms max latency.
> 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg 
> latency, 1927.0 ms max latency.
> 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg 
> latency, 1685.0 ms max latency.
> 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg 
> latency, 2146.0 ms max latency.
> 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg 
> latency, 2125.0 ms max latency.
> 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg 
> latency, 1502.0 ms max latency.
> 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg 
> latency, 1932.0 ms max latency.
> 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg 
> latency, 1807.0 ms max latency.
> 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg 
> latency, 1892.0 ms max latency.
> 354526 records sent, 70905.2 records/

[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-31 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen updated KAFKA-15264:
-
Attachment: image-2023-07-31-18-05-21-772.png

> Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
> ---
>
> Key: KAFKA-15264
> URL: https://issues.apache.org/jira/browse/KAFKA-15264
> Project: Kafka
>  Issue Type: Bug
>Reporter: jianbin.chen
>Priority: Major
> Attachments: image-2023-07-28-09-51-01-662.png, 
> image-2023-07-28-09-52-38-941.png, image-2023-07-31-18-04-54-112.png, 
> image-2023-07-31-18-05-21-772.png
>
>
> I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster 
> deployment), and when I recently compared and tested, I found that when using 
> the following stress test command, the throughput gap is obvious
>  
> {code:java}
> ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 
> --record-size 1024 --throughput -1 --producer-props 
> bootstrap.servers=xxx: acks=1
> 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg 
> latency, 588.0 ms max latency.
> 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg 
> latency, 460.0 ms max latency.
> 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg 
> latency, 1120.0 ms max latency.
> 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg 
> latency, 1097.0 ms max latency.
> 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg 
> latency, 610.0 ms max latency.
> 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg 
> latency, 1892.0 ms max latency.
> 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg 
> latency, 3000.0 ms max latency.
> 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg 
> latency, 1781.0 ms max latency.
> 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg 
> latency, 2590.0 ms max latency.
> 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg 
> latency, 1463.0 ms max latency.
> 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg 
> latency, 2362.0 ms max latency.
> 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg 
> latency, 2986.0 ms max latency.
> 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg 
> latency, 2958.0 ms max latency.
> 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg 
> latency, 1959.0 ms max latency.
> 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg 
> latency, 1995.0 ms max latency.
> 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg 
> latency, 1496.0 ms max latency.
> 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg 
> latency, 2446.0 ms max latency.
> 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg 
> latency, 2715.0 ms max latency.
> 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg 
> latency, 2702.0 ms max latency.
> 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg 
> latency, 1846.0 ms max latency.
> 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg 
> latency, 1414.0 ms max latency.
> 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg 
> latency, 1897.0 ms max latency.
> 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg 
> latency, 1601.0 ms max latency.
> 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg 
> latency, 1563.0 ms max latency.
> 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg 
> latency, 1975.0 ms max latency.
> 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg 
> latency, 1753.0 ms max latency.
> 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg 
> latency, 1833.0 ms max latency.
> 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg 
> latency, 1927.0 ms max latency.
> 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg 
> latency, 1685.0 ms max latency.
> 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg 
> latency, 2146.0 ms max latency.
> 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg 
> latency, 2125.0 ms max latency.
> 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg 
> latency, 1502.0 ms max latency.
> 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg 
> latency, 1932.0 ms max latency.
> 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg 
> latency, 1807.0 ms max latency.
> 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg 
> latency, 1892.0 ms max latency.
> 354526 records sent, 70905.2 records/

[jira] [Commented] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-31 Thread jianbin.chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749124#comment-17749124
 ] 

jianbin.chen commented on KAFKA-15264:
--

The latest progress, I upgraded the 1.1.0+zk cluster to 3.5.1+zk

!image-2023-07-31-18-05-21-772.png|width=745,height=288!
{code:java}
362387 records sent, 72477.4 records/sec (70.78 MB/sec), 292.9 ms avg latency, 
922.0 ms max latency.
545850 records sent, 109148.2 records/sec (106.59 MB/sec), 282.8 ms avg 
latency, 739.0 ms max latency.
533250 records sent, 106650.0 records/sec (104.15 MB/sec), 286.2 ms avg 
latency, 1478.0 ms max latency.
513000 records sent, 102600.0 records/sec (100.20 MB/sec), 267.9 ms avg 
latency, 2386.0 ms max latency.
538290 records sent, 107636.5 records/sec (105.11 MB/sec), 302.5 ms avg 
latency, 5450.0 ms max latency.
536850 records sent, 107370.0 records/sec (104.85 MB/sec), 268.7 ms avg 
latency, 1367.0 ms max latency.
552300 records sent, 110460.0 records/sec (107.87 MB/sec), 294.1 ms avg 
latency, 1474.0 ms max latency.
483464 records sent, 96673.5 records/sec (94.41 MB/sec), 254.0 ms avg latency, 
1207.0 ms max latency.
498666 records sent, 99713.3 records/sec (97.38 MB/sec), 368.1 ms avg latency, 
2662.0 ms max latency.
542400 records sent, 108436.6 records/sec (105.90 MB/sec), 280.9 ms avg 
latency, 1290.0 ms max latency.
462288 records sent, 92439.1 records/sec (90.27 MB/sec), 315.3 ms avg latency, 
2558.0 ms max latency.
541980 records sent, 108396.0 records/sec (105.86 MB/sec), 295.8 ms avg 
latency, 2641.0 ms max latency.
554700 records sent, 110895.6 records/sec (108.30 MB/sec), 268.8 ms avg 
latency, 530.0 ms max latency.
520350 records sent, 104070.0 records/sec (101.63 MB/sec), 293.7 ms avg 
latency, 2977.0 ms max latency.
483600 records sent, 96700.7 records/sec (94.43 MB/sec), 261.1 ms avg latency, 
2704.0 ms max latency.
559500 records sent, 111877.6 records/sec (109.26 MB/sec), 319.4 ms avg 
latency, 2713.0 ms max latency.
549300 records sent, 109838.0 records/sec (107.26 MB/sec), 278.3 ms avg 
latency, 1683.0 ms max latency.
551700 records sent, 110340.0 records/sec (107.75 MB/sec), 277.5 ms avg 
latency, 1731.0 ms max latency.
548550 records sent, 109710.0 records/sec (107.14 MB/sec), 248.7 ms avg 
latency, 1821.0 ms max latency.
552900 records sent, 110557.9 records/sec (107.97 MB/sec), 259.4 ms avg 
latency, 1821.0 ms max latency.
551400 records sent, 110280.0 records/sec (107.70 MB/sec), 318.7 ms avg 
latency, 3584.0 ms max latency.
487350 records sent, 97470.0 records/sec (95.19 MB/sec), 286.6 ms avg latency, 
2205.0 ms max latency.
542850 records sent, 108548.3 records/sec (106.00 MB/sec), 285.9 ms avg 
latency, 2273.0 ms max latency.
545850 records sent, 109148.2 records/sec (106.59 MB/sec), 278.7 ms avg 
latency, 1790.0 ms max latency.
557250 records sent, 111450.0 records/sec (108.84 MB/sec), 293.4 ms avg 
latency, 1572.0 ms max latency.
466231 records sent, 93246.2 records/sec (91.06 MB/sec), 244.0 ms avg latency, 
2566.0 ms max latency.
538157 records sent, 107631.4 records/sec (105.11 MB/sec), 351.2 ms avg 
latency, 2565.0 ms max latency.
556650 records sent, 111330.0 records/sec (108.72 MB/sec), 274.4 ms avg 
latency, 392.0 ms max latency.
542144 records sent, 108428.8 records/sec (105.89 MB/sec), 270.6 ms avg 
latency, 2148.0 ms max latency.
416323 records sent, 83264.6 records/sec (81.31 MB/sec), 270.0 ms avg latency, 
2681.0 ms max latency.
472528 records sent, 94505.6 records/sec (92.29 MB/sec), 389.7 ms avg latency, 
3709.0 ms max latency.
503602 records sent, 100720.4 records/sec (98.36 MB/sec), 320.8 ms avg latency, 
4058.0 ms max latency.
484719 records sent, 96905.0 records/sec (94.63 MB/sec), 293.5 ms avg latency, 
2579.0 ms max latency.
375346 records sent, 75069.2 records/sec (73.31 MB/sec), 449.0 ms avg latency, 
2982.0 ms max latency.
534300 records sent, 106838.6 records/sec (104.33 MB/sec), 282.5 ms avg 
latency, 1581.0 ms max latency.
533550 records sent, 106710.0 records/sec (104.21 MB/sec), 288.5 ms avg 
latency, 2381.0 ms max latency.
319650 records sent, 63917.2 records/sec (62.42 MB/sec), 487.8 ms avg latency, 
2517.0 ms max latency.
538950 records sent, 107790.0 records/sec (105.26 MB/sec), 280.8 ms avg 
latency, 1608.0 ms max latency.
547050 records sent, 109410.0 records/sec (106.85 MB/sec), 272.3 ms avg 
latency, 2777.0 ms max latency.
372958 records sent, 74591.6 records/sec (72.84 MB/sec), 381.7 ms avg latency, 
2447.0 ms max latency.
550438 records sent, 110087.6 records/sec (107.51 MB/sec), 297.7 ms avg 
latency, 2545.0 ms max latency.
529650 records sent, 105930.0 records/sec (103.45 MB/sec), 279.5 ms avg 
latency, 2695.0 ms max latency.
428030 records sent, 85520.5 records/sec (83.52 MB/sec), 289.7 ms avg latency, 
5425.0 ms max latency.
415080 records sent, 83016.0 records/sec (81.07 MB/sec), 329.4 ms avg latency, 
5011.0 ms max latency.
426597 records sent, 85

[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


kamalcph commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1279093020


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws 
RemoteStorageException
 }
 }
 
+List enrichedLogSegments(UnifiedLog log, Long 
fromOffset, Long lastStableOffset) {
+List enrichedLogSegments = new ArrayList<>();
+List segments = 
JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq());
+if (!segments.isEmpty()) {
+int idx = 1;
+for (; idx < segments.size(); idx++) {
+LogSegment previous = segments.get(idx - 1);
+LogSegment current = segments.get(idx);
+enrichedLogSegments.add(new EnrichedLogSegment(previous, 
current.baseOffset()));
+}
+// LogSegment#readNextOffset() is an expensive call, so we 
only call it when necessary.
+int lastIdx = idx - 1;
+if (segments.get(lastIdx).baseOffset() < lastStableOffset) {
+LogSegment last = segments.get(lastIdx);
+enrichedLogSegments.add(new EnrichedLogSegment(last, 
last.readNextOffset()));
+}

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14133: KAFKA-15189: only init remote topic metrics when enabled

2023-07-31 Thread via GitHub


divijvaidya commented on code in PR #14133:
URL: https://github.com/apache/kafka/pull/14133#discussion_r1279101884


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -227,7 +227,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   }
 }
 
-class BrokerTopicMetrics(name: Option[String]) {
+class BrokerTopicMetrics(name: Option[String], systemRemoteStorageEnabled: 
Boolean) {

Review Comment:
   In it's current form, this is not very extensible for future cases because 
if we want to optionally add metrics for a new feature, we will have to add new 
booleans as arguments to determine the usage of that feature. 
   
   Perhaps we can either pass the entire config object and let this 
`BrokerTopicMetrics` class make a decision based on config on what metrics it 
want to enable.



##
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##
@@ -77,4 +79,28 @@ class KafkaRequestHandlerTest {
 assertEquals(Some(startTime + 200), 
request.callbackRequestDequeueTimeNanos)
 assertEquals(Some(startTime + 300), 
request.callbackRequestCompleteTimeNanos)
   }
+
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testTopicStats(systemRemoteStorageEnabled: Boolean): Unit = {

Review Comment:
   I would feel more confident if we have a test, that spins up the servers 
without Remote Storage enabled and verifies that no Remote Storage related 
metrics are registered. To do this, we will have to collect all Remote Storage 
metrics in a class called "RemoteStorageMetrics" similar to 
QuorumControllerMetrics and use that class as source of truth for all metrics 
names related to  RemoteStorage. In the test, we can validate that none of the 
metrics related to RemoteStorage is registered in the default registry.



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -439,12 +441,14 @@ class BrokerTopicStats extends Logging {
   
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
   topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
   topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
-  topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesOutPerSec)
-  topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesInPerSec)
-  topicMetrics.closeMetric(BrokerTopicStats.RemoteReadRequestsPerSec)
-  topicMetrics.closeMetric(BrokerTopicStats.RemoteWriteRequestsPerSec)
-  topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteReadRequestsPerSec)
-  
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteWriteRequestsPerSec)
+  if (systemRemoteStorageEnabled) {

Review Comment:
   closeMetric is a no-op if metric is missing, hence we don't need to do a 
check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2023-07-31 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749158#comment-17749158
 ] 

Divij Vaidya commented on KAFKA-15038:
--

Hi [~owen-leung] 

Than you for looking into this. Yes, we want to replace 
*ConcurrentMap topicPartitionIds* cache in 
RemoteLogManager. However, instead we want to cache available in every broker 
called the Metadata cache [1]which will be the single source of authority on a 
broker about topicId <-> topicName mapping. This cache is updated 
asynchronously on every broker. 

[1] 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/MetadataCache.scala]
 

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Owen C.H. Leung
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] sambhav-jain-16 opened a new pull request, #14134: test changes

2023-07-31 Thread via GitHub


sambhav-jain-16 opened a new pull request, #14134:
URL: https://github.com/apache/kafka/pull/14134

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-31 Thread via GitHub


clolov commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1658242751

   It appears that in Zookeeper 3.8.0 the Zookeeper community moved away from 
log4j to logback (source: https://issues.apache.org/jira/browse/ZOOKEEPER-4427)
   
   In the failing test cases we attach an appender to the root logger (Kafka 
uses a log4j logger) and assert that we see certain log messages. Because of 
the mismatch in expectation (log4j) and reality (logback) these tests have 
started failing as the test-appender has no messages.
   
   The difference in Zookeeper dependencies between current trunk (31 July 
2023) and this pull request is:
   ```
   trunk
   
   org.apache.zookeeper:zookeeper:3.6.4
   > io.netty:netty-handler:4.1.94.Final
   > io.netty:netty-transport-native-epoll:4.1.94.Final
   > org.apache.yetus:audience-annotations:0.13.0
   > org.apache.zookeeper:zookeeper-jute:3.6.4
   > org.slf4j:slf4j-api:1.7.36
   
   pull request
   
   org.apache.zookeeper:zookeeper:3.8.2
   > ch.qos.logback:logback-classic:1.2.10 <- NEW
   > ch.qos.logback:logback-core:1.2.10<- NEW
   > commons-io:commons-io:2.11.0  <- NEW
   > io.netty:netty-handler:4.1.94.Final
   > io.netty:netty-transport-native-epoll:4.1.94.Final
   > org.apache.yetus:audience-annotations:0.12.0
   > org.apache.zookeeper:zookeeper-jute:3.8.2
   > org.slf4j:slf4j-api:1.7.36
   ```
   
   I have removed the `ch.qos.logback:*` dependencies in the latest commit and 
tests have started passing locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15274) support moving files to be deleted to other directories

2023-07-31 Thread jianbin.chen (Jira)
jianbin.chen created KAFKA-15274:


 Summary: support moving files to be deleted to other directories
 Key: KAFKA-15274
 URL: https://issues.apache.org/jira/browse/KAFKA-15274
 Project: Kafka
  Issue Type: Improvement
Reporter: jianbin.chen


Hello everyone, I am a Kafka user from China. Our company operates in public 
clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large 
amount of data exchange and business message delivery every day. Daily messages 
consume a significant amount of disk space. Purchasing the corresponding 
storage capacity on these cloud providers incurs substantial costs, especially 
for SSDs with ultra-high IOPS. High IOPS is very effective for disaster 
recovery, especially in the event of a sudden broker failure where storage 
space becomes full or memory space is exhausted leading to OOM kills. This high 
IOPS storage greatly improves data recovery efficiency, forcing us to adopt 
smaller storage specifications with high IO to save costs. Particularly, cloud 
providers only allow capacity expansion but not reduction.

Currently, we have come up with a solution and would like to contribute it to 
the community for discussion. When we need to delete logs, I can purchase S3 or 
Minio storage from services like AWS and mount it to my brokers. When a log 
needs to be deleted, we can decide how it leaves the broker. The default is to 
delete it directly, while the move option moves it to S3. Since most of the 
deleted data is cold data that won't be used in the short term, this approach 
improves the retention period of historical data while maintaining good cost 
control.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15274) support moving files to be deleted to other directories

2023-07-31 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen reassigned KAFKA-15274:


Assignee: jianbin.chen

> support moving files to be deleted to other directories
> ---
>
> Key: KAFKA-15274
> URL: https://issues.apache.org/jira/browse/KAFKA-15274
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jianbin.chen
>Assignee: jianbin.chen
>Priority: Major
>
> Hello everyone, I am a Kafka user from China. Our company operates in public 
> clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large 
> amount of data exchange and business message delivery every day. Daily 
> messages consume a significant amount of disk space. Purchasing the 
> corresponding storage capacity on these cloud providers incurs substantial 
> costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective 
> for disaster recovery, especially in the event of a sudden broker failure 
> where storage space becomes full or memory space is exhausted leading to OOM 
> kills. This high IOPS storage greatly improves data recovery efficiency, 
> forcing us to adopt smaller storage specifications with high IO to save 
> costs. Particularly, cloud providers only allow capacity expansion but not 
> reduction.
> Currently, we have come up with a solution and would like to contribute it to 
> the community for discussion. When we need to delete logs, I can purchase S3 
> or Minio storage from services like AWS and mount it to my brokers. When a 
> log needs to be deleted, we can decide how it leaves the broker. The default 
> is to delete it directly, while the move option moves it to S3. Since most of 
> the deleted data is cold data that won't be used in the short term, this 
> approach improves the retention period of historical data while maintaining 
> good cost control.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] a364176773 commented on pull request #14129: feature: support moving files to be deleted to other directories

2023-07-31 Thread via GitHub


a364176773 commented on PR #14129:
URL: https://github.com/apache/kafka/pull/14129#issuecomment-1658258913

   https://issues.apache.org/jira/browse/KAFKA-15274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.

2023-07-31 Thread Nikita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749178#comment-17749178
 ] 

Nikita commented on KAFKA-15252:


Hi [~ChrisEgerton]! I think the tickets  
[KAFKA-15090|https://issues.apache.org/jira/browse/KAFKA-15090] and 
[KAFKA-14725|https://issues.apache.org/jira/browse/KAFKA-14725] cover our case. 
Let's close current one as duplicate.

> Task is not stopped until the poll interval passes in case of task restarting.
> --
>
> Key: KAFKA-15252
> URL: https://issues.apache.org/jira/browse/KAFKA-15252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nikita
>Priority: Major
>
> We face a problem with restarting of the tasks, sometimes it leads to 
> resource leak. 
> We used the jdbc source connector and noticed an increasing of count of 
> opened sessions on Vertica side. But this problem is applicable for all 
> databases and possibly for all source connectors.
> Our case is the next: 
> 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) 
> and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's 
> the property on Kafka-connect side, we set 1)
> 2) Send POST /connectors//tasks//restart
> ER: count of session is the same as before restart
> AR: count of session increases
> The main problem is when 
> org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection)
>   method is called it doesn't stop a source task itself. 
> The source task stops only if polling process stops on source task side. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.

2023-07-31 Thread Nikita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikita resolved KAFKA-15252.

Resolution: Duplicate

> Task is not stopped until the poll interval passes in case of task restarting.
> --
>
> Key: KAFKA-15252
> URL: https://issues.apache.org/jira/browse/KAFKA-15252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nikita
>Priority: Major
>
> We face a problem with restarting of the tasks, sometimes it leads to 
> resource leak. 
> We used the jdbc source connector and noticed an increasing of count of 
> opened sessions on Vertica side. But this problem is applicable for all 
> databases and possibly for all source connectors.
> Our case is the next: 
> 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) 
> and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's 
> the property on Kafka-connect side, we set 1)
> 2) Send POST /connectors//tasks//restart
> ER: count of session is the same as before restart
> AR: count of session increases
> The main problem is when 
> org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection)
>   method is called it doesn't stop a source task itself. 
> The source task stops only if polling process stops on source task side. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


showuon commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1279212920


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log) 
throws InterruptedException
 if (lso < 0) {
 logger.warn("lastStableOffset for partition {} is {}, 
which should not be negative.", topicIdPartition, lso);
 } else if (lso > 0 && copiedOffset < lso) {
-// Copy segments only till the last-stable-offset as 
remote storage should contain only committed/acked
-// messages
-long toOffset = lso;
-logger.debug("Checking for segments to copy, copiedOffset: 
{} and toOffset: {}", copiedOffset, toOffset);
-long activeSegBaseOffset = 
log.activeSegment().baseOffset();
 // log-start-offset can be ahead of the read-offset, when:
 // 1) log-start-offset gets incremented via delete-records 
API (or)
 // 2) enabling the remote log for the first time
 long fromOffset = Math.max(copiedOffset + 1, 
log.logStartOffset());
-ArrayList sortedSegments = new 
ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, 
toOffset)));
-
sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset));
-List sortedBaseOffsets = 
sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList());
-int activeSegIndex = 
Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset);
-
-// sortedSegments becomes empty list when fromOffset and 
toOffset are same, and activeSegIndex becomes -1
-if (activeSegIndex < 0) {
+long activeSegmentBaseOffset = 
log.activeSegment().baseOffset();
+
+// Segments which match the following criteria are 
eligible for copying to remote storage:
+// 1) Segment is not the active segment and
+// 2) Segment end-offset is less than the 
last-stable-offset as remote storage should contain only
+//committed/acked messages
+List candidateSegments = 
enrichedLogSegments(log, fromOffset)
+.stream()
+.filter(enrichedSegment ->  
enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset && 
enrichedSegment.nextSegmentOffset <= lso)

Review Comment:
   If a segment contains 0 records and `log.roll.ms` timeout passed, then what 
will be the `baseOffset` of next active segment? Do you have any idea? I asked 
because I'm not sure if `enrichedSegment.logSegment.baseOffset() != 
activeSegmentBaseOffset` is necessary. If we already filtered out the active 
segment in the `enrichedLogSegments` method, should we also need this check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15274) support moving files to be deleted to other directories

2023-07-31 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen updated KAFKA-15274:
-
Issue Type: Task  (was: Improvement)

> support moving files to be deleted to other directories
> ---
>
> Key: KAFKA-15274
> URL: https://issues.apache.org/jira/browse/KAFKA-15274
> Project: Kafka
>  Issue Type: Task
>Reporter: jianbin.chen
>Assignee: jianbin.chen
>Priority: Major
>
> Hello everyone, I am a Kafka user from China. Our company operates in public 
> clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large 
> amount of data exchange and business message delivery every day. Daily 
> messages consume a significant amount of disk space. Purchasing the 
> corresponding storage capacity on these cloud providers incurs substantial 
> costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective 
> for disaster recovery, especially in the event of a sudden broker failure 
> where storage space becomes full or memory space is exhausted leading to OOM 
> kills. This high IOPS storage greatly improves data recovery efficiency, 
> forcing us to adopt smaller storage specifications with high IO to save 
> costs. Particularly, cloud providers only allow capacity expansion but not 
> reduction.
> Currently, we have come up with a solution and would like to contribute it to 
> the community for discussion. When we need to delete logs, I can purchase S3 
> or Minio storage from services like AWS and mount it to my brokers. When a 
> log needs to be deleted, we can decide how it leaves the broker. The default 
> is to delete it directly, while the move option moves it to S3. Since most of 
> the deleted data is cold data that won't be used in the short term, this 
> approach improves the retention period of historical data while maintaining 
> good cost control.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


kamalcph commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1279280694


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log) 
throws InterruptedException
 if (lso < 0) {
 logger.warn("lastStableOffset for partition {} is {}, 
which should not be negative.", topicIdPartition, lso);
 } else if (lso > 0 && copiedOffset < lso) {
-// Copy segments only till the last-stable-offset as 
remote storage should contain only committed/acked
-// messages
-long toOffset = lso;
-logger.debug("Checking for segments to copy, copiedOffset: 
{} and toOffset: {}", copiedOffset, toOffset);
-long activeSegBaseOffset = 
log.activeSegment().baseOffset();
 // log-start-offset can be ahead of the read-offset, when:
 // 1) log-start-offset gets incremented via delete-records 
API (or)
 // 2) enabling the remote log for the first time
 long fromOffset = Math.max(copiedOffset + 1, 
log.logStartOffset());
-ArrayList sortedSegments = new 
ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, 
toOffset)));
-
sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset));
-List sortedBaseOffsets = 
sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList());
-int activeSegIndex = 
Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset);
-
-// sortedSegments becomes empty list when fromOffset and 
toOffset are same, and activeSegIndex becomes -1
-if (activeSegIndex < 0) {
+long activeSegmentBaseOffset = 
log.activeSegment().baseOffset();
+
+// Segments which match the following criteria are 
eligible for copying to remote storage:
+// 1) Segment is not the active segment and
+// 2) Segment end-offset is less than the 
last-stable-offset as remote storage should contain only
+//committed/acked messages
+List candidateSegments = 
enrichedLogSegments(log, fromOffset)
+.stream()
+.filter(enrichedSegment ->  
enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset && 
enrichedSegment.nextSegmentOffset <= lso)

Review Comment:
   > If a segment contains 0 records and log.roll.ms timeout passed, then what 
will be the baseOffset of next active segment? 
   
   If the active segment is empty, then it [won't be 
rotated](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L71).
 
   
   The in-memory check `enrichedSegment.logSegment.baseOffset() != 
activeSegmentBaseOffset` is kind of redundant, we can remove it later when 
required.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #14126: MINOR Fix a Scala 2.12 compile issue

2023-07-31 Thread via GitHub


mumrah merged PR #14126:
URL: https://github.com/apache/kafka/pull/14126


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


clolov commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r127954


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log) 
throws InterruptedException
 if (lso < 0) {
 logger.warn("lastStableOffset for partition {} is {}, 
which should not be negative.", topicIdPartition, lso);
 } else if (lso > 0 && copiedOffset < lso) {
-// Copy segments only till the last-stable-offset as 
remote storage should contain only committed/acked
-// messages
-long toOffset = lso;
-logger.debug("Checking for segments to copy, copiedOffset: 
{} and toOffset: {}", copiedOffset, toOffset);
-long activeSegBaseOffset = 
log.activeSegment().baseOffset();
 // log-start-offset can be ahead of the read-offset, when:
 // 1) log-start-offset gets incremented via delete-records 
API (or)
 // 2) enabling the remote log for the first time
 long fromOffset = Math.max(copiedOffset + 1, 
log.logStartOffset());
-ArrayList sortedSegments = new 
ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, 
toOffset)));
-
sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset));
-List sortedBaseOffsets = 
sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList());
-int activeSegIndex = 
Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset);
-
-// sortedSegments becomes empty list when fromOffset and 
toOffset are same, and activeSegIndex becomes -1
-if (activeSegIndex < 0) {
+long activeSegmentBaseOffset = 
log.activeSegment().baseOffset();
+
+// Segments which match the following criteria are 
eligible for copying to remote storage:
+// 1) Segment is not the active segment and
+// 2) Segment end-offset is less than the 
last-stable-offset as remote storage should contain only
+//committed/acked messages
+List candidateSegments = 
enrichedLogSegments(log, fromOffset)
+.stream()
+.filter(enrichedSegment ->  
enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset && 
enrichedSegment.nextSegmentOffset <= lso)

Review Comment:
   Nit: Instead of doing this filtering after we have created the enriched 
segments list and iterating again can we not push the condition as part of 
creating the enriched segments list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment

2023-07-31 Thread via GitHub


vamossagar12 commented on code in PR #14000:
URL: https://github.com/apache/kafka/pull/14000#discussion_r1279411709


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -151,7 +151,7 @@ public void 
testAssignmentsWhenWorkersJoinAfterRevocations()  {
 // in this round
 addNewEmptyWorkers("worker3");
 performStandardRebalance();
-assertTrue(assignor.delay > 0);
+assertEquals(40, assignor.delay); // First successive revoking 
rebalance.

Review Comment:
   Added an elaborate check over here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment

2023-07-31 Thread via GitHub


vamossagar12 commented on code in PR #14000:
URL: https://github.com/apache/kafka/pull/14000#discussion_r1279414273


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -161,30 +161,38 @@ public void 
testAssignmentsWhenWorkersJoinAfterRevocations()  {
 time.sleep(assignor.delay);
 addNewEmptyWorkers("worker4");
 performStandardRebalance();
+assertEquals(0, assignor.delay); // No revocations in this round.
 assertWorkers("worker1", "worker2", "worker3", "worker4");
 assertConnectorAllocations(0, 0, 1, 1);
 assertTaskAllocations(0, 3, 3, 3);
 
 // Fifth assignment and a fifth worker joining after a revoking 
rebalance.
-// We shouldn't revoke and set a delay > initial interval
+// We shouldn't revoke and set a delay equal to initial interval
 addNewEmptyWorkers("worker5");
 performStandardRebalance();
-assertTrue(assignor.delay > 40);
+assertEquals(40, assignor.delay); // First successive revoking 
rebalance.

Review Comment:
   IMO this happens because we weren't unsetting the counter when delay goes 
equal to 0. This is the first successive revoking rebalance and the delay 
should be 40.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment

2023-07-31 Thread via GitHub


vamossagar12 commented on code in PR #14000:
URL: https://github.com/apache/kafka/pull/14000#discussion_r1279415811


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -161,30 +161,38 @@ public void 
testAssignmentsWhenWorkersJoinAfterRevocations()  {
 time.sleep(assignor.delay);
 addNewEmptyWorkers("worker4");
 performStandardRebalance();
+assertEquals(0, assignor.delay); // No revocations in this round.
 assertWorkers("worker1", "worker2", "worker3", "worker4");
 assertConnectorAllocations(0, 0, 1, 1);
 assertTaskAllocations(0, 3, 3, 3);
 
 // Fifth assignment and a fifth worker joining after a revoking 
rebalance.
-// We shouldn't revoke and set a delay > initial interval
+// We shouldn't revoke and set a delay equal to initial interval
 addNewEmptyWorkers("worker5");
 performStandardRebalance();
-assertTrue(assignor.delay > 40);
+assertEquals(40, assignor.delay); // First successive revoking 
rebalance.
 assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
 assertConnectorAllocations(0, 0, 1, 1, 1);
 assertTaskAllocations(1, 2, 3, 3, 3);
 
-// Sixth assignment with sixth worker joining after the expiry.
-// Should revoke
-time.sleep(assignor.delay);

Review Comment:
   I removed this to prove that the changes still work correctly and the delay 
gets extended further.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments

2023-07-31 Thread via GitHub


vamossagar12 commented on PR #14000:
URL: https://github.com/apache/kafka/pull/14000#issuecomment-1658526465

   @gharris1727 I updated the PR to cater to the changes I was talking about 
[here](https://github.com/apache/kafka/pull/14000#issuecomment-1642481356). 
Note that the original comment correction is not needed anymore. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-31 Thread via GitHub


clolov commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1658538656

   The failures in JDK 8/Scala 2.12 have to do with formatting on code which 
hasn't been touched as part of this pull request, so I believe it is safe to go 
ahead.
   
   There are 2 failures in JDK 20/Scala 2.13 - one concerned with tiered 
storage (RemoteIndexCacheTest - cleaner did not shutdown) and one with simple 
production (PlaintextProducerSendTest - not enough brokers for specified RF). I 
believe these are transient failures as they do not manifest when running 
locally, but if people are unconvinced we can rerun the tests.
   
   Otherwise, I think we are ready to merge this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-07-31 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749225#comment-17749225
 ] 

Christo Lolov commented on KAFKA-15267:
---

Heya [~satishd], [~showuon], [~divijvaidya]! I had a quick sync with Satish 
about this ticket and we believe that since this is an already defined 
configuration the proposed changes do not require a KIP. What are your thoughts 
on the problem and the recommended approach?

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence re

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment

2023-07-31 Thread via GitHub


vamossagar12 commented on code in PR #14000:
URL: https://github.com/apache/kafka/pull/14000#discussion_r1279414273


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -161,30 +161,38 @@ public void 
testAssignmentsWhenWorkersJoinAfterRevocations()  {
 time.sleep(assignor.delay);
 addNewEmptyWorkers("worker4");
 performStandardRebalance();
+assertEquals(0, assignor.delay); // No revocations in this round.
 assertWorkers("worker1", "worker2", "worker3", "worker4");
 assertConnectorAllocations(0, 0, 1, 1);
 assertTaskAllocations(0, 3, 3, 3);
 
 // Fifth assignment and a fifth worker joining after a revoking 
rebalance.
-// We shouldn't revoke and set a delay > initial interval
+// We shouldn't revoke and set a delay equal to initial interval
 addNewEmptyWorkers("worker5");
 performStandardRebalance();
-assertTrue(assignor.delay > 40);
+assertEquals(40, assignor.delay); // First successive revoking 
rebalance.

Review Comment:
   IMO this happens because we weren't unsetting the counter when delay goes 
equal to 0. This is the first successive revoking rebalance and the delay 
should be 40. I say this because this case is similar to worker2 -> worker3 
joining above where the delay was 40. It should be similar here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage

2023-07-31 Thread via GitHub


kamalcph commented on PR #14128:
URL: https://github.com/apache/kafka/pull/14128#issuecomment-1658615037

   @showuon @clolov @satishd 
   
   Addressed your review comments. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-31 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749248#comment-17749248
 ] 

José Armando García Sancio commented on KAFKA-15264:


Thanks [~funkye] for the tests.

Can you attach the broker logs during the above tests?

Can you also run the same tests but use a replication factor 1? There is a 
theory that this could be related to an issue that was fixed here: 
[https://github.com/apache/kafka/pull/13765] or you can run the KRaft tests 
against `trunk`.

> Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
> ---
>
> Key: KAFKA-15264
> URL: https://issues.apache.org/jira/browse/KAFKA-15264
> Project: Kafka
>  Issue Type: Bug
>Reporter: jianbin.chen
>Priority: Major
> Attachments: image-2023-07-28-09-51-01-662.png, 
> image-2023-07-28-09-52-38-941.png, image-2023-07-31-18-04-54-112.png, 
> image-2023-07-31-18-05-21-772.png
>
>
> I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster 
> deployment), and when I recently compared and tested, I found that when using 
> the following stress test command, the throughput gap is obvious
>  
> {code:java}
> ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 
> --record-size 1024 --throughput -1 --producer-props 
> bootstrap.servers=xxx: acks=1
> 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg 
> latency, 588.0 ms max latency.
> 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg 
> latency, 460.0 ms max latency.
> 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg 
> latency, 1120.0 ms max latency.
> 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg 
> latency, 1097.0 ms max latency.
> 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg 
> latency, 610.0 ms max latency.
> 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg 
> latency, 1892.0 ms max latency.
> 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg 
> latency, 3000.0 ms max latency.
> 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg 
> latency, 1781.0 ms max latency.
> 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg 
> latency, 2590.0 ms max latency.
> 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg 
> latency, 1463.0 ms max latency.
> 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg 
> latency, 2362.0 ms max latency.
> 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg 
> latency, 2986.0 ms max latency.
> 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg 
> latency, 2958.0 ms max latency.
> 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg 
> latency, 1959.0 ms max latency.
> 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg 
> latency, 1995.0 ms max latency.
> 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg 
> latency, 1496.0 ms max latency.
> 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg 
> latency, 2446.0 ms max latency.
> 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg 
> latency, 2715.0 ms max latency.
> 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg 
> latency, 2702.0 ms max latency.
> 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg 
> latency, 1846.0 ms max latency.
> 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg 
> latency, 1414.0 ms max latency.
> 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg 
> latency, 1897.0 ms max latency.
> 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg 
> latency, 1601.0 ms max latency.
> 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg 
> latency, 1563.0 ms max latency.
> 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg 
> latency, 1975.0 ms max latency.
> 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg 
> latency, 1753.0 ms max latency.
> 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg 
> latency, 1833.0 ms max latency.
> 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg 
> latency, 1927.0 ms max latency.
> 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg 
> latency, 1685.0 ms max latency.
> 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg 
> latency, 2146.0 ms max latency.
> 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg 
> latency, 2125.0 ms max latency.
> 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg 
> latency, 1502.0 ms max latenc

[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 [WIP] Move ReassignPartitionsCommand to java

2023-07-31 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1658638517

   Hello @mimaison 
   
   > If you can think of ways to split it in at least 2 PRs that would be 
preferable
   
   My plan is the following:
   
   1. PR to rewrite AdminUtils  - https://github.com/apache/kafka/pull/14096 . 
It independent from the others and can be reviewed right now. I checked all 
tests failures and it seems like all of them is just flacky tests from trunk. 
Can you, please, join the review?
   
   2. PR to create a copy of POJOs and custom exceptions of 
`ReassignPartitionsCommand`.
   
   3. PR to rewrite command itself and it's tests.
   
   To finish all three I need to rewrite `ReassignPartitionsIntegrationTest` 
which I do right now.
   I will split PR in three after it.
   
   What do you think? Can you start review of #14096 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14118: KAFKA-14875: Implement wakeup

2023-07-31 Thread via GitHub


kirktrue commented on code in PR #14118:
URL: https://github.com/apache/kafka/pull/14118#discussion_r1279620345


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java:
##
@@ -19,25 +19,33 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 public class OffsetFetchApplicationEvent extends ApplicationEvent {
-final public CompletableFuture> 
future;
-public final Set partitions;
+private final CompletableFuture> 
future;
+private final Set partitions;
 
 public OffsetFetchApplicationEvent(final Set partitions) {
 super(Type.FETCH_COMMITTED_OFFSET);
 this.partitions = partitions;
 this.future = new CompletableFuture<>();

Review Comment:
   Shouldn't this just extend from `CompletableApplicationEvent` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13247: KAFKA-14595 [WIP] Move ReassignPartitionsCommand to java

2023-07-31 Thread via GitHub


mimaison commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1658806935

   I'll try to take a look at #14096 this week. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-31 Thread via GitHub


guozhangwang commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1658843159

   > I think the root cause is performReassignments in GeneralAssignmentBuilder 
is a O(partSize * consumerNum) functions, it is very inefficient.
   
   Ack, and with your changes, it's not constant and not linear to either 
num.partitions or num.consumers right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14118: KAFKA-14875: Implement wakeup

2023-07-31 Thread via GitHub


lianetm commented on code in PR #14118:
URL: https://github.com/apache/kafka/pull/14118#discussion_r1279692395


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java:
##
@@ -19,25 +19,33 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 public class OffsetFetchApplicationEvent extends ApplicationEvent {
-final public CompletableFuture> 
future;
-public final Set partitions;
+private final CompletableFuture> 
future;
+private final Set partitions;
 
 public OffsetFetchApplicationEvent(final Set partitions) {
 super(Type.FETCH_COMMITTED_OFFSET);
 this.partitions = partitions;
 this.future = new CompletableFuture<>();

Review Comment:
   Agree, and it does already. It is just that the changes are not here yet 
(only in integration branch for now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14118: KAFKA-14875: Implement wakeup

2023-07-31 Thread via GitHub


philipnee commented on code in PR #14118:
URL: https://github.com/apache/kafka/pull/14118#discussion_r1279701769


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java:
##
@@ -19,25 +19,33 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 public class OffsetFetchApplicationEvent extends ApplicationEvent {
-final public CompletableFuture> 
future;
-public final Set partitions;
+private final CompletableFuture> 
future;
+private final Set partitions;
 
 public OffsetFetchApplicationEvent(final Set partitions) {
 super(Type.FETCH_COMMITTED_OFFSET);
 this.partitions = partitions;
 this.future = new CompletableFuture<>();

Review Comment:
   CompletableApplicationEvent isn't available at the current trunk yet.  I 
think it happens at a different commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15274) support moving files to be deleted to other directories

2023-07-31 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749312#comment-17749312
 ] 

Greg Harris commented on KAFKA-15274:
-

Hi [~funkye] and thank you for the suggested feature!

Are you familiar with this KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]
 ? I believe it also intends to solve the problem you're experiencing.

> support moving files to be deleted to other directories
> ---
>
> Key: KAFKA-15274
> URL: https://issues.apache.org/jira/browse/KAFKA-15274
> Project: Kafka
>  Issue Type: Task
>Reporter: jianbin.chen
>Assignee: jianbin.chen
>Priority: Major
>
> Hello everyone, I am a Kafka user from China. Our company operates in public 
> clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large 
> amount of data exchange and business message delivery every day. Daily 
> messages consume a significant amount of disk space. Purchasing the 
> corresponding storage capacity on these cloud providers incurs substantial 
> costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective 
> for disaster recovery, especially in the event of a sudden broker failure 
> where storage space becomes full or memory space is exhausted leading to OOM 
> kills. This high IOPS storage greatly improves data recovery efficiency, 
> forcing us to adopt smaller storage specifications with high IO to save 
> costs. Particularly, cloud providers only allow capacity expansion but not 
> reduction.
> Currently, we have come up with a solution and would like to contribute it to 
> the community for discussion. When we need to delete logs, I can purchase S3 
> or Minio storage from services like AWS and mount it to my brokers. When a 
> log needs to be deleted, we can decide how it leaves the broker. The default 
> is to delete it directly, while the move option moves it to S3. Since most of 
> the deleted data is cold data that won't be used in the short term, this 
> approach improves the retention period of historical data while maintaining 
> good cost control.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement

2023-07-31 Thread Florin Akermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749325#comment-17749325
 ] 

Florin Akermann commented on KAFKA-14748:
-

Thank you [~guozhang].
Here is the KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-07-31 Thread Florin Akermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749322#comment-17749322
 ] 

Florin Akermann commented on KAFKA-12317:
-

[~guozhang] [~mjsax] 
KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-07-31 Thread via GitHub


junrao commented on PR #14111:
URL: https://github.com/apache/kafka/pull/14111#issuecomment-1659045347

   @AndrewJSchofield : Looking at the code a bit more closely. I am not sure 
that the exponential backoff logic is added properly for the common failure 
cases in this PR.
   
   On the producer side, this is the loop when there is a leader change.
   
   ```
   Sender
   call RecordAccumulator.partitionReady() to check if the batch need to backoff
   call completeBatch() 
 --> on retriable error, reenqueueBatch()
 --> on metadata error, trigger updateMetadata()
   ```
   
   If the metadata propagation is delayed, the Sender will stay in the above 
loop for multiple iterations. This PR only uses retryBackoffMax when the 
metadata update fails. However, in the common case, the metadata request won't 
fail and the issue is that the metadata is stale. So, it seems that we need to 
change RecordAccumulator.partitionReady() to check retryBackoffMax and 
implement the exponential backoff logic when the batch is re-enqueued. 
   
   On the consumer side, there is a similar loop when there is a leader epoch 
bump.
   
   ```
   KafkaConsumer.poll()
   call sendFetches()
   call pollForFetches() 
 --> call AbstractFetch.handleInitializeCompletedFetchErrors --> 
requestMetadataUpdate()
   ```
   
   Again, if the metadata propagation is delayed, the consumer poll() call will 
stay in the above loop for multiple iterations. This PR only uses 
retryBackoffMax when the metadata update fails. However, in the common case, 
the metadata request won't fail and the issue is that the metadata is stale. 
So, it seems that we need to add the exponential backoff logic to the above 
loop.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-31 Thread via GitHub


philipnee commented on PR #14123:
URL: https://github.com/apache/kafka/pull/14123#issuecomment-1659084728

   Current failures:
   ```
   New failing - 12
   Build / JDK 11 and Scala 2.13 / [1] tlsProtocol=TLSv1.2, useInlinePem=false 
– org.apache.kafka.common.network.SslTransportLayerTest
   <1s
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   1m 45s
   Build / JDK 20 and Scala 2.13 / 
testNonDefaultConnectionCountLimitAndRateLimit() – 
kafka.network.ConnectionQuotasTest
   10s
   Build / JDK 20 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   12s
   Build / JDK 17 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 51s
   Build / JDK 17 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
   1m 43s
   Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() 
– org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   2m 6s
   Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   2m 7s
   Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   2m 28s
   Build / JDK 8 and Scala 2.12 / testRackAwareRangeAssignor() – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   41s
   Build / JDK 8 and Scala 2.12 / testClose() – 
kafka.log.remote.RemoteIndexCacheTest
   <1s
   Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   12s
   Existing failures - 4
   Build / JDK 11 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   1m 48s
   Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   1m 48s
   Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   2m 44s
   Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   2m 18s
   ```
   
   Non related to the code change.  No build failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-31 Thread via GitHub


lianetm commented on code in PR #14123:
URL: https://github.com/apache/kafka/pull/14123#discussion_r1279887573


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -153,12 +154,32 @@ public void testCommitAsync_UserSuppliedCallback() {
 @SuppressWarnings("unchecked")
 public void testCommitted() {
 Set mockTopicPartitions = 
mockTopicPartitionOffset().keySet();
-mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> {
-when(mock.complete(any())).thenReturn(new HashMap<>());
-});
-consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
-assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, 
Duration.ofMillis(1)));
-
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+CompletableFuture> 
committedFuture = new CompletableFuture<>();
+try (MockedConstruction mockConstruction =
+ mockConstruction(OffsetFetchApplicationEvent.class, (mock, 
ctx) -> {
+ when(mock.future()).thenReturn(committedFuture);
+ })) {
+committedFuture.complete(mockTopicPartitionOffset());
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, 
Duration.ofMillis(1000)));
+
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+}
+}
+
+@Test
+@SuppressWarnings("unchecked")

Review Comment:
   Does not seem needed I would say



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-31 Thread via GitHub


kirktrue commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1659177876

   @junrao:
   
   > @kirktrue : It seems that JDK 20 tests didn't complete because of failures 
in ':storage:unitTest'. It would be useful to verify if this is an existing 
issue.
   
   Looking at the logs for that test run, there are no individual test failures 
shown in the logs or in the Jenkins UI for the storage module's unit tests.
   
   According to the Gradle Enterprise tool I recently found out about, there 
were [~200 cases in the last 28 days where test runs failed with that 
error](https://ge.apache.org/scans/failures?failures.expandedFailures=WzEsMl0&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America/Los_Angeles):
   
   ```
   Execution failed for task ' '.
   > Process 'Gradle Test Executor ' finished with non-zero exit 
value 1
 This problem might be caused by incorrect test process configuration.
 For more on test execution, please refer to 
https://docs.gradle.org//userguide/java_testing.html#sec:test_execution in the Gradle 
documentation.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-31 Thread via GitHub


philipnee commented on code in PR #14123:
URL: https://github.com/apache/kafka/pull/14123#discussion_r1279890789


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -153,12 +154,32 @@ public void testCommitAsync_UserSuppliedCallback() {
 @SuppressWarnings("unchecked")
 public void testCommitted() {
 Set mockTopicPartitions = 
mockTopicPartitionOffset().keySet();
-mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> {
-when(mock.complete(any())).thenReturn(new HashMap<>());
-});
-consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
-assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, 
Duration.ofMillis(1)));
-
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+CompletableFuture> 
committedFuture = new CompletableFuture<>();
+try (MockedConstruction mockConstruction =
+ mockConstruction(OffsetFetchApplicationEvent.class, (mock, 
ctx) -> {
+ when(mock.future()).thenReturn(committedFuture);
+ })) {
+committedFuture.complete(mockTopicPartitionOffset());
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, 
Duration.ofMillis(1000)));
+
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+}
+}
+
+@Test
+@SuppressWarnings("unchecked")

Review Comment:
   you are right!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-31 Thread via GitHub


junrao commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1659202647

   Thanks @kirktrue. @satishd : Are you aware of any transient failures in 
:storage:unitTest? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-31 Thread via GitHub


kirktrue commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1659256150

   > @satishd : Are you aware of any transient failures in :storage:unitTest? 
Thanks.
   
   According to [Gradle 
Enterprise](https://ge.apache.org/scans/failures?failures.failureMessage=Execution%20failed%20for%20task%20%27:storage:unitTest%27.%0A%3E%20Process%20%27Gradle%20Test%20Executor%20*%20finished%20with%20non-zero%20exit%20value%201%0A%20%20This%20problem%20might%20be%20caused%20by%20incorrect%20test%20process%20configuration.%0A%20%20For%20more%20on%20test%20execution%2C%20please%20refer%20to%20https://docs.gradle.org/8.2.1/userguide/java_testing.html%23sec:test_execution%20in%20the%20Gradle%20documentation.&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America/Los_Angeles),
 it's failed 71 times starting on July 18, 2023.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14724:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Port tests in FetcherTest to FetchRequestManagerTest
> 
>
> Key: KAFKA-14724
> URL: https://issues.apache.org/jira/browse/KAFKA-14724
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task involves copying the relevant tests from {{FetcherTest}} and 
> modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14758) Extract inner classes from Fetcher for reuse in refactoring

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14758:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Extract inner classes from Fetcher for reuse in refactoring
> ---
>
> Key: KAFKA-14758
> URL: https://issues.apache.org/jira/browse/KAFKA-14758
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.5.0
>
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task includes refactoring {{Fetcher}} by extracting out the inner 
> classes into top-level (though still in {{{}internal{}}}) so that those 
> classes can be referenced by forthcoming refactored fetch logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager to integrate fetch into new consumer threading refactor

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Introduce FetchRequestManager to integrate fetch into new consumer threading 
> refactor
> -
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task is to introduce a new class named {{FetchRequestManager}} that will 
> be responsible for:
>  # Formatting fetch requests to the background thread
>  # Configuring the callback on fetch responses for the background thread
> The response handler will collect the fetch responses from the broker and 
> create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. 
> The newly introduced {{FetchUtils}} will be used for both 
> {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as 
> possible.
> The foreground logic will decompress the data into a {{{}Record{}}}, which 
> will then be deserialized into a {{ConsumerRecord}} for returning to the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15174) Ensure the correct thread is executing the callbacks

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15174:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Ensure the correct thread is executing the callbacks
> 
>
> Key: KAFKA-15174
> URL: https://issues.apache.org/jira/browse/KAFKA-15174
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> We need to add assertion tests to ensure the correct thread is executing the 
> offset commit callbacks and rebalance callback



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15184) New consumer internals refactoring and clean up

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15184:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> New consumer internals refactoring and clean up
> ---
>
> Key: KAFKA-15184
> URL: https://issues.apache.org/jira/browse/KAFKA-15184
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> Minor refactoring of the new consumer internals including introduction of the 
> {{RequestManagers}} class to hold references to the {{RequestManager}} 
> instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14965) Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new consumer threading refactor

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14965:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new 
> consumer threading refactor
> -
>
> Key: KAFKA-14965
> URL: https://issues.apache.org/jira/browse/KAFKA-14965
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> This task introduces new functionality for handling ListOffsetsRequests for 
> the new consumer implementation, as part for the ongoing work for the 
> consumer threading model refactor.
> This task introduces a new class named {{ListOffsetsRequestManager, 
> }}responsible of handling ListOffsets requests performed by the consumer to 
> expose functionality like beginningOffsets, endOffsets and offsetsForTimes. 
> The Offset{{{}Fetcher{}}} class is used internally by the {{KafkaConsumer}} 
> to list offsets, so this task will be based on a refactored 
> Offset{{{}Fetcher{}}},  reusing the fetching logic as much as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14252) Create background thread skeleton for new Consumer threading model

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14252:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Create background thread skeleton for new Consumer threading model
> --
>
> Key: KAFKA-14252
> URL: https://issues.apache.org/jira/browse/KAFKA-14252
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> The event handler internally instantiates a background thread to consume 
> ApplicationEvents and produce BackgroundEvents.  In this ticket, we will 
> create a skeleton of the background thread.  We will incrementally add 
> implementation in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Extract common logic from Fetcher
> -
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.5.0
>
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task includes refactoring {{Fetcher}} by extracting out some common 
> logic to allow forthcoming implementations to leverage it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14246) Update threading model for Consumer (KIP-945)

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14246:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Update threading model for Consumer (KIP-945)
> -
>
> Key: KAFKA-14246
> URL: https://issues.apache.org/jira/browse/KAFKA-14246
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> Hi community,
>  
> We are refactoring the current KafkaConsumer and making it more asynchronous. 
>  This is the master Jira to track the project's progress; subtasks will be 
> linked to this ticket.  Please review the design document and feel free to 
> use this thread for discussion. 
>  
> The design document is here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor]
>  
> The original email thread is here: 
> [https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l]
>  
> I will continue to update the 1pager as reviews and comments come.
>  
> Thanks, 
> P



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14875) Implement Wakeup()

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14875:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Implement Wakeup()
> --
>
> Key: KAFKA-14875
> URL: https://issues.apache.org/jira/browse/KAFKA-14875
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> Implement wakeup() and WakeupException.  This would be different to the 
> current implementation because I think we just need to interrupt the blocking 
> futures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15163:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Implement validatePositions functionality for new KafkaConsumer
> ---
>
> Key: KAFKA-15163
> URL: https://issues.apache.org/jira/browse/KAFKA-15163
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> Introduce support for validating positions in the new OffsetsRequestManager. 
> This task will include a new event for the validatePositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The validate positions implementation will keep the same behaviour as the one 
> in the old consumer, but adapted to the new threading model. So it is based 
> in a VALIDATE_POSITIONS events that is submitted to the background thread, 
> and the processed by the ApplicationEventProcessor. The processing itself is 
> done by the OffsetRequestManager given that this will require an 
> OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such 
> requests in the OffsetRequestManager, responsible for offset-related requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14848) KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14848:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig
> 
>
> Key: KAFKA-14848
> URL: https://issues.apache.org/jira/browse/KAFKA-14848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.5.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> [~rayokota] found some {{{}NullPointerException{}}}s that originate because 
> of a recently introduced error in the {{KafkaConsumer}} constructor. The code 
> was changed to pass the deserializer variables into the {{FetchConfig}} 
> constructor. However, this code change incorrectly used the locally-scoped 
> variables, not the instance-scoped variables. Since the locally-scoped 
> variables could be {{{}null{}}}, this results in the {{FetchConfig}} storing 
> {{null}} references, leading to downstream breakage.
> Suggested change:
> {noformat}
> - FetchConfig fetchConfig = new FetchConfig<>(config, keyDeserializer, 
> valueDeserializer, isolationLevel);
> + FetchConfig fetchConfig = new FetchConfig<>(config, 
> this.keyDeserializer, this.valueDeserializer, isolationLevel);
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14950) Implement assign() and assignment()

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14950:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Implement assign() and assignment()
> ---
>
> Key: KAFKA-14950
> URL: https://issues.apache.org/jira/browse/KAFKA-14950
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.6.0
>
>
> Implement assign() and assignment()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15180) Generalize integration tests to change use of KafkaConsumer to Consumer

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15180:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Generalize integration tests to change use of KafkaConsumer to Consumer
> ---
>
> Key: KAFKA-15180
> URL: https://issues.apache.org/jira/browse/KAFKA-15180
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.6.0
>
>
> For the consumer threading refactor project, we're introducing a new 
> implementation of the {{Consumer}} interface. However, most of the instances 
> in the integration tests specifically use the concrete implementation 
> {{{}KafkaConsumer{}}}. This task is to generalize those uses where possible 
> to use the {{Consumer}} interface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14675) Extract metadata-related tasks from Fetcher into MetadataFetcher

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14675:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Extract metadata-related tasks from Fetcher into MetadataFetcher
> 
>
> Key: KAFKA-14675
> URL: https://issues.apache.org/jira/browse/KAFKA-14675
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.5.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task covers the work to extract from {{Fetcher}} the APIs that are 
> related to metadata operations into a new class named 
> {{{}MetadataFetcher{}}}. This will allow the refactoring of {{Fetcher}} and 
> {{MetadataFetcher}} for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14960) Metadata Request Manager and listTopics/partitionsFor API

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14960:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Metadata Request Manager and listTopics/partitionsFor API
> -
>
> Key: KAFKA-14960
> URL: https://issues.apache.org/jira/browse/KAFKA-14960
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> Implement listTopics and partitionsFor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14937) Refactoring for client code to reduce boilerplate

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14937:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Refactoring for client code to reduce boilerplate
> -
>
> Key: KAFKA-14937
> URL: https://issues.apache.org/jira/browse/KAFKA-14937
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, producer 
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.6.0
>
>
> There are a number of places in the client code where the same basic calls 
> are made by more than one client implementation. Minor refactoring will 
> reduce the amount of boilerplate code necessary for the client to construct 
> its internal state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15188) Implement more of the remaining PrototypeAsyncConsumer APIs

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15188:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Implement more of the remaining PrototypeAsyncConsumer APIs
> ---
>
> Key: KAFKA-15188
> URL: https://issues.apache.org/jira/browse/KAFKA-15188
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> There are several {{Consumer}} APIs that only touch the {{ConsumerMetadata}} 
> and/or {{SubscriptionState}} classes; they do not perform network I/O or 
> otherwise block. These can be implemented without needing {{RequestManager}} 
> updates and include the following APIs:
>  - {{committed}}
>  - {{currentLag}}
>  - {{listTopics}}
>  - {{metrics}}
>  - {{partitionsFor}}
>  - {{pause}}
>  - {{paused}}
>  - {{position}}
>  - {{resume}}
>  - {{seek}}
>  - {{seekToBeginning}}
>  - {{seekToEnd}}
>  - {{subscribe}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14468) Refactor Commit Logic

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14468:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Refactor Commit Logic
> -
>
> Key: KAFKA-14468
> URL: https://issues.apache.org/jira/browse/KAFKA-14468
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.5.0
>
>
> Refactor commit logic using the new multi-threaded coordinator construct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15081) Implement new consumer offsetsForTimes

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15081:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Implement new consumer offsetsForTimes
> --
>
> Key: KAFKA-15081
> URL: https://issues.apache.org/jira/browse/KAFKA-15081
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> Implement offsetForTimes for the kafka consumer based on the new threading 
> model, using the ListOffsetsRequestManager. No changes at the consumer API 
> level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in ListOffsetRequestManager

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15115:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Implement resetPositions functionality in ListOffsetRequestManager
> --
>
> Key: KAFKA-15115
> URL: https://issues.apache.org/jira/browse/KAFKA-15115
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> Introduce support for resetting positions in the new 
> ListOffsetsRequestManager. This task will include a new event for the 
> resetPositions calls performed from the new consumer, and the logic for 
> handling such events in the ListOffsetRequestManager.
> The reset positions implementation will keep the same behaviour as the one in 
> the old consumer, but adapted to the new threading model. So it is based in a 
> RESET_POSITIONS events that is submitted to the background thread, and the 
> processed by the ApplicationEventProcessor. The processing itself is done by 
> the ListOffsetRequestManager given that this will require a LIST_OFFSETS 
> request for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15173:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> ApplicationEventQueue and BackgroundEventQueue should be bounded
> 
>
> Key: KAFKA-15173
> URL: https://issues.apache.org/jira/browse/KAFKA-15173
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
> facilitate message passing between the application thread and the background 
> thread.  The current implementation is boundless, which can potentially cause 
> OOM and other performance-related issues.
> I think the queues need a finite bound, and we need to decide how to handle 
> the situation when the bound is reached.  In particular, I would like to 
> answer these questions:
>  
>  # What should the upper limit be for both queues: Can this be a 
> configurable, memory-based bound? Or just an arbitrary number of events as 
> the bound.
>  # What should happen when the application event queue is filled up?  It 
> seems like we should introduce a new exception type and notify the user that 
> the consumer is full.
>  # What should happen when the background event queue is filled up? This 
> seems less likely to happen, but I imagine it could happen when the user 
> stops polling the consumer, causing the queue to be filled.
>  # Is it necessary to introduce a public configuration for the queue? I think 
> initially we would select an arbitrary constant number and see the community 
> feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14761) Integration Tests for the New Consumer Implementation

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14761:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Integration Tests for the New Consumer Implementation
> -
>
> Key: KAFKA-14761
> URL: https://issues.apache.org/jira/browse/KAFKA-14761
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> This Jira tracks the efforts of integratoin testing for the new consumer we 
> are implementing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14264) Refactor coordinator code

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14264:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Refactor coordinator code
> -
>
> Key: KAFKA-14264
> URL: https://issues.apache.org/jira/browse/KAFKA-14264
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.4.0
>
>
> To refactor the consumer, we changed how the coordinator is called.  However, 
> there will be a time period where the old and new implementation need to 
> coexist, so we will need to override some of the methods and create a new 
> implementation of the coordinator.  In particular:
>  # ensureCoordinatorReady needs to be non-blocking or we could just use the 
> sendFindCoordinatorRequest.
>  # joinGroupIfNeeded needs to be broken up into more find grain stages for 
> the new implementation to work.
> We also need to create the coordinator state machine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14247) Implement EventHandler interface and DefaultEventHandler for Consumer

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14247:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Implement EventHandler interface and DefaultEventHandler for Consumer
> -
>
> Key: KAFKA-14247
> URL: https://issues.apache.org/jira/browse/KAFKA-14247
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> The background thread runs inside of the DefaultEventHandler to consume 
> events from the ApplicationEventQueue and produce events to the 
> BackgroundEventQueue.
> The background thread runnable consist of a loop that tries to poll events 
> from the ApplicationQueue, processes the event if there are any, and poll 
> networkClient.
> In this implementation, the DefaultEventHandler spawns a thread that runs the 
> BackgroundThreadRunnable.  The runnable, as of the current PR, does the 
> following things:
>  # Initialize the networkClient
>  # Poll ApplicationEvent from the queue if there's any
>  # process the event
>  # poll the networkClient
> PR: https://github.com/apache/kafka/pull/12672



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15164) Extract reusable logic from OffsetsForLeaderEpochClient

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15164:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Extract reusable logic from OffsetsForLeaderEpochClient
> ---
>
> Key: KAFKA-15164
> URL: https://issues.apache.org/jira/browse/KAFKA-15164
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
>
> The OffsetsForLeaderEpochClient class is used for making asynchronous 
> requests to the OffsetsForLeaderEpoch API. It encapsulates the logic for:
>  * preparing the requests
>  * sending them over the network using the network client
>  * handling the response
> The new KafkaConsumer implementation, based on a new threading model, 
> requires the same logic for preparing the requests and handling the 
> responses, with different behaviour for how the request is actually sent.
> This task includes refactoring OffsetsForLeaderEpochClient by extracting out 
> the logic for preparing the requests and handling the responses. No changes 
> in the existing logic, just making the functionality available to be reused.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14966) Extract reusable logic from OffsetFetcher

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14966:
--
Labels: consumer-threading-refactor kip-945  (was: kip-945)

> Extract reusable logic from OffsetFetcher
> -
>
> Key: KAFKA-14966
> URL: https://issues.apache.org/jira/browse/KAFKA-14966
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-945
> Fix For: 3.6.0
>
>
> The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, 
> validate and reset positions. 
> For the new consumer based on a refactored threading model, similar 
> functionality will be needed by the ListOffsetsRequestManager component. 
> This task aims at identifying and extracting the OffsetFetcher functionality 
> that can be reused by the new consumer implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15184) New consumer internals refactoring and clean up

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15184:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-945)

> New consumer internals refactoring and clean up
> ---
>
> Key: KAFKA-15184
> URL: https://issues.apache.org/jira/browse/KAFKA-15184
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Minor refactoring of the new consumer internals including introduction of the 
> {{RequestManagers}} class to hold references to the {{RequestManager}} 
> instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15163:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-945)

> Implement validatePositions functionality for new KafkaConsumer
> ---
>
> Key: KAFKA-15163
> URL: https://issues.apache.org/jira/browse/KAFKA-15163
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Introduce support for validating positions in the new OffsetsRequestManager. 
> This task will include a new event for the validatePositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The validate positions implementation will keep the same behaviour as the one 
> in the old consumer, but adapted to the new threading model. So it is based 
> in a VALIDATE_POSITIONS events that is submitted to the background thread, 
> and the processed by the ApplicationEventProcessor. The processing itself is 
> done by the OffsetRequestManager given that this will require an 
> OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such 
> requests in the OffsetRequestManager, responsible for offset-related requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14875) Implement Wakeup()

2023-07-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14875:
--
Labels: consumer-threading-refactor  (was: consumer-threading-refactor 
kip-945)

> Implement Wakeup()
> --
>
> Key: KAFKA-14875
> URL: https://issues.apache.org/jira/browse/KAFKA-14875
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Implement wakeup() and WakeupException.  This would be different to the 
> current implementation because I think we just need to interrupt the blocking 
> futures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >