[PR] KAFKA-10787: Apply spotless to simple module [kafka]

2024-06-11 Thread via GitHub


gongxuanzhang opened a new pull request, #16296:
URL: https://github.com/apache/kafka/pull/16296

   This PR is sub PR from https://github.com/apache/kafka/pull/16097.
   It is part of a series of changes to progressively apply [spotless 
plugin(import-order)] across all modules. In this step, the plugin is activated 
in the 
   * log4j-appender
   * trogdor
   * jmh-benchmarks
   * examples
   * shell
   * generator
   
   
   ## Module and history 
   
   > Please see the table below for the historical changes related to applying 
the Spotless plugin
   
   | module| apply |  related PR  |
   | -- | --- | --- |
   | :clients | ❌| future |
   | :connect:api | ❌| future |
   | :connect:basic-auth-extension | ❌| future |
   | :connect:file | ❌| future |
   | :connect:json | ❌| future |
   | :connect:mirror | ❌| future |
   | :connect:mirror-client | ❌| future |
   | :connect:runtime | ❌| future |
   | :connect:test-plugins | ❌| future |
   | :connect:transforms | ❌| future |
   | :core | ❌| future |
   | :examples | ❌| future |
   | :generator | ❌| future |
   | :group-coordinator:group-coordinator-api | ❌| future |
   | :group-coordinator | ❌| future |
   | :jmh-benchmarks | ❌| future |
   | :log4j-appender | ❌| future |
   | :metadata | ❌| future |
   | :server | ❌| future |
   | :shell | ❌| future |
   | :storage | ❌| future |
   | :storage:storage-api | ❌| future |
   | :streams | ❌| future |
   | :streams:examples | ❌| future |
   | :streams:streams-scala | ❌| future |
   | :streams:test-utils | ❌| future |
   | :streams:upgrade-system-tests-0100 | ❌| future |
   | :streams:upgrade-system-tests-0101 | ❌| future |
   | :streams:upgrade-system-tests-0102 | ❌| future |
   | :streams:upgrade-system-tests-0110 | ❌| future |
   | :streams:upgrade-system-tests-10 | ❌| future |
   | :streams:upgrade-system-tests-11 | ❌| future |
   | :streams:upgrade-system-tests-20 | ❌| future |
   | :streams:upgrade-system-tests-21 | ❌| future |
   | :streams:upgrade-system-tests-22 | ❌| future |
   | :streams:upgrade-system-tests-23 | ❌| future |
   | :streams:upgrade-system-tests-24 | ❌| future |
   | :streams:upgrade-system-tests-25 | ❌| future |
   | :streams:upgrade-system-tests-26 | ❌| future |
   | :streams:upgrade-system-tests-27 | ❌| future |
   | :streams:upgrade-system-tests-28 | ❌| future |
   | :streams:upgrade-system-tests-30 | ❌| future |
   | :streams:upgrade-system-tests-31 | ❌| future |
   | :streams:upgrade-system-tests-32 | ❌| future |
   | :streams:upgrade-system-tests-33 | ❌| future |
   | :streams:upgrade-system-tests-34 | ❌| future |
   | :streams:upgrade-system-tests-35 | ❌| future |
   | :streams:upgrade-system-tests-36 | ❌| future |
   | :streams:upgrade-system-tests-37 | ❌| future |
   | :trogdor | ❌| future |
   | :raft |  ✅| https://github.com/apache/kafka/pull/16278 |
   | :server-common | ✅| https://github.com/apache/kafka/pull/16172 |
   | :transaction-coordinator | ✅| 
https://github.com/apache/kafka/pull/16172 |
   | :tools |  ✅   |  https://github.com/apache/kafka/pull/16262  |
   | :tools:tools-api | ✅   | https://github.com/apache/kafka/pull/16262 |
   
   
   ## How to test:
   We can run `./gradlew spotlessCheck`  check for code that does not meet 
requirements. 
   If we get report that error , we can run `./gradlew spotlessApply` to review 
my code.
   In this PR, all change(exclude `build.gradle`) ` by `spotlessApply` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Enable state updater by default [kafka]

2024-06-11 Thread via GitHub


cadonna commented on PR #16107:
URL: https://github.com/apache/kafka/pull/16107#issuecomment-2162162705

   Merged to `trunk` and cherry-picked to `3.8` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16936) Upgrade slf4k to 2.0.9 and integrate "-Dslf4j.provider" to kafka script

2024-06-11 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854254#comment-17854254
 ] 

Muralidhar Basani commented on KAFKA-16936:
---

[~chia7712] can I try looking at this, if you haven't started any work on this?

> Upgrade slf4k to 2.0.9 and integrate "-Dslf4j.provider" to kafka script
> ---
>
> Key: KAFKA-16936
> URL: https://issues.apache.org/jira/browse/KAFKA-16936
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> origin discussion: 
> [https://github.com/apache/kafka/pull/16260#issuecomment-2159632052]
> The specific provider class can be defined  by `slf4j.provider`[0]. Hence, we 
> can add the slf4j backends we care about to dependencies. With that, our 
> distributions will have different slf4j backends and it is safe as we will 
> define slf4j.provider in our script. Also, those slf4j backends will be 
> collected to "dependend-libs", and hence we can run kafka instance from 
> source code with specific provider too.
> In short, the following tasks are included by this jira
> 1. upgrade slf4j from 1.7.36 to 2.0.9+
> 2. add a new system variable to script to define -Dslf4j.provider easily. By 
> default we use org.slf4j.reload4j.Reload4jServiceProvider
> 3. add other slf4j backend dependencies (optional)
>  [0] https://www.slf4j.org/manual.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10199: Enable state updater by default [kafka]

2024-06-11 Thread via GitHub


cadonna merged PR #16107:
URL: https://github.com/apache/kafka/pull/16107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16937) Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate

2024-06-11 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854252#comment-17854252
 ] 

Chia-Ping Tsai commented on KAFKA-16937:


Maybe we should study the history of that method. If the pattern can be applied 
to code base, we should keep it and have more usage. Or we can refactor it if 
it is rare case now.

> Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate
> --
>
> Key: KAFKA-16937
> URL: https://issues.apache.org/jira/browse/KAFKA-16937
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> Time#waitObject is implemented by while-loop and it is used by 
> `ProducerMetadata` only. Hence, this jira can include following changes:
> 1. move `Time#waitObject` to  `ProducerMetadata#awaitUpdate`
> 2. ProducerMetadata#awaitUpdate can throw "exact" TimeoutException [0]
> [0] 
> https://github.com/apache/kafka/blob/23fe71d579f84d59ebfe6d5a29e688315cec1285/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1176



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]

2024-06-11 Thread via GitHub


abhijeetk88 commented on code in PR #16078:
URL: https://github.com/apache/kafka/pull/16078#discussion_r1635838998


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -817,6 +817,119 @@ class DynamicBrokerConfigTest {
 Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
   }
 
+  @Test
+  def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
+val copyQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
+
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+val config = KafkaConfig.fromProps(props)
+val serverMock: KafkaServer = mock(classOf[KafkaServer])
+val remoteLogManagerMockOpt = 
Option(Mockito.mock(classOf[RemoteLogManager]))
+
+Mockito.when(serverMock.config).thenReturn(config)
+
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+config.dynamicConfig.initialize(None, None)
+config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
 config.getLong(copyQuotaProp))
+
+// Update default config
+props.put(copyQuotaProp, "100")
+config.dynamicConfig.updateDefaultConfig(props)
+assertEquals(100, config.getLong(copyQuotaProp))

Review Comment:
   Should we take this as part of a follow-up PR? I want to avoid unrelated 
changes here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]

2024-06-11 Thread via GitHub


abhijeetk88 commented on code in PR #16078:
URL: https://github.com/apache/kafka/pull/16078#discussion_r1635838230


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1165,43 +1165,66 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 newConfig.values.forEach { (k, v) =>
-  if (reconfigurableConfigs.contains(k)) {
-if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
-  val newValue = v.asInstanceOf[Long]
-  val oldValue = getValue(server.config, k)
-  if (newValue != oldValue && newValue <= 0) {
-val errorMsg = s"Dynamic remote log manager config update 
validation failed for $k=$v"
-throw new ConfigException(s"$errorMsg, value should be at least 1")
-  }
+  if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
 ||

Review Comment:
   Not completely sure if `k` can be null, but I changed it since it provides 
safety.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]

2024-06-11 Thread via GitHub


abhijeetk88 commented on code in PR #16078:
URL: https://github.com/apache/kafka/pull/16078#discussion_r1635837473


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -817,6 +817,119 @@ class DynamicBrokerConfigTest {
 Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
   }
 
+  @Test
+  def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
+val copyQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
+
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+val config = KafkaConfig.fromProps(props)
+val serverMock: KafkaServer = mock(classOf[KafkaServer])
+val remoteLogManagerMockOpt = 
Option(Mockito.mock(classOf[RemoteLogManager]))
+
+Mockito.when(serverMock.config).thenReturn(config)
+
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+config.dynamicConfig.initialize(None, None)
+config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
 config.getLong(copyQuotaProp))
+
+// Update default config
+props.put(copyQuotaProp, "100")
+config.dynamicConfig.updateDefaultConfig(props)
+assertEquals(100, config.getLong(copyQuotaProp))
+Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)
+
+// Update per broker config
+props.put(copyQuotaProp, "200")
+config.dynamicConfig.updateBrokerConfig(0, props)
+assertEquals(200, config.getLong(copyQuotaProp))
+Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)
+
+Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+  }
+
+  @Test
+  def testRemoteLogManagerFetchQuotaUpdates(): Unit = {

Review Comment:
   Fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16937) Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate

2024-06-11 Thread dujian0068 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854251#comment-17854251
 ] 

dujian0068 commented on KAFKA-16937:


Hello:

The developer of {{Time#waitObject()}} seems to want to provide a unified 
synchronization method based on {{Time, I don't think it's necessary to delete 
it}}

> Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate
> --
>
> Key: KAFKA-16937
> URL: https://issues.apache.org/jira/browse/KAFKA-16937
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> Time#waitObject is implemented by while-loop and it is used by 
> `ProducerMetadata` only. Hence, this jira can include following changes:
> 1. move `Time#waitObject` to  `ProducerMetadata#awaitUpdate`
> 2. ProducerMetadata#awaitUpdate can throw "exact" TimeoutException [0]
> [0] 
> https://github.com/apache/kafka/blob/23fe71d579f84d59ebfe6d5a29e688315cec1285/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1176



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-11 Thread via GitHub


dujian0068 commented on code in PR #16266:
URL: https://github.com/apache/kafka/pull/16266#discussion_r1635827051


##
clients/src/test/java/org/apache/kafka/common/utils/TimeTest.java:
##
@@ -26,9 +26,11 @@
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public abstract class TimeTest {

Review Comment:
   Before I delete `SystimeTime`  and `SystemTimeTest` class,  which resulted 
in the class having no abstract method, so I changed it to concrete
   Now I have restored it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-11 Thread via GitHub


dujian0068 commented on code in PR #16266:
URL: https://github.com/apache/kafka/pull/16266#discussion_r1635735601


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -30,7 +30,40 @@
  */
 public interface Time {
 
-Time SYSTEM = new SystemTime();
+//  A time implementation that uses the system clock and sleep call.
+//  Use inline implementation to ensure that only one Time#SYSTEM exists 
in a program
+Time SYSTEM = new Time() {

Review Comment:
   Thank you
   I  have fixed the problem



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Remote fetch throttle metrics [kafka]

2024-06-11 Thread via GitHub


satishd commented on code in PR #16087:
URL: https://github.com/apache/kafka/pull/16087#discussion_r1635823261


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -77,16 +78,17 @@ public void updateQuota(Quota newQuota) {
 }
 }
 
-public boolean isQuotaExceeded() {
+public QuotaCheckResult checkQuota() {
 Sensor sensorInstance = sensor();
 try {
 sensorInstance.checkQuotas();
 } catch (QuotaViolationException qve) {
 LOGGER.debug("Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",
 sensorInstance.name(), qve.metric().metricName(), qve.value(), 
qve.bound());
-return true;
+long throttleTimeMs = QuotaUtils.throttleTime(qve, 
time.milliseconds());
+return new QuotaCheckResult(true, throttleTimeMs);

Review Comment:
   We can return long and remove returning a `QuotaCHeckResult` wrapper. This 
will avoid creating a wrapper instance and add the respective util check to 
determine the quota is exceeded if the value > 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16803: Update ShadowJavaPlugin [kafka]

2024-06-11 Thread via GitHub


Nancy-ksolves opened a new pull request, #16295:
URL: https://github.com/apache/kafka/pull/16295

Upgrade to a different ShadowJavaPlugin to remove deprecated dependency 
'org.gradle.util.ConfigureUtil'.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16661: Added a lower `log.initial.task.delay.ms` value [kafka]

2024-06-11 Thread via GitHub


showuon commented on code in PR #16221:
URL: https://github.com/apache/kafka/pull/16221#discussion_r1635814686


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1181,7 +1181,7 @@ object TestUtils extends Logging {
transactionVerificationEnabled: Boolean = false,
log: Option[UnifiedLog] = None,
remoteStorageSystemEnable: Boolean = false,
-   initialTaskDelayMs: Long = 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = {
+   initialTaskDelayMs: Long = 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT_INTEGRATION_TEST): 
LogManager = {

Review Comment:
   Yes, `createBrokerConfigs` is a place to override some configs for testing, 
like we set `props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000")`, 
so that we can quickly delete a log file to speed up the test. Similar things 
like
   ```
   // Reduce number of threads per broker
   props.put(ServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2")
   props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2")
   ```
   As the comment said, we want to reduce the number of threads per broker, to 
speed up the tests.
   That's why we should add a smaller initialTaskDelay here to speed up tests.
   Does that make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16890: Compute valid log-start-offset when deleting overlapping remote segments [kafka]

2024-06-11 Thread via GitHub


satishd commented on code in PR #16237:
URL: https://github.com/apache/kafka/pull/16237#discussion_r1635802677


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -912,10 +911,8 @@ class RemoteLogRetentionHandler {
 
 private final Optional retentionSizeData;
 private final Optional retentionTimeData;
-
 private long remainingBreachedSize;
-
-private OptionalLong logStartOffset = OptionalLong.empty();
+private Optional logStartOffset = Optional.empty();

Review Comment:
   We can continue using `OptionalLong` as discussed offline as it has the 
required methods.



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2038,6 +2038,69 @@ public void testDeletionOnRetentionBreachedSegments(long 
retentionSize,
 assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
 }
 
+@ParameterizedTest(name = 
"testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} 
retentionMs={1}")
+@CsvSource(value = {"0, -1", "-1, 0"})
+public void testDeletionOnOverlappingRetentionBreachedSegments(long 
retentionSize,
+   long 
retentionMs)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+Map logProps = new HashMap<>();
+logProps.put("retention.bytes", retentionSize);
+logProps.put("retention.ms", retentionMs);
+LogConfig mockLogConfig = new LogConfig(logProps);
+when(mockLog.config()).thenReturn(mockLogConfig);
+
+List epochEntries = Collections.singletonList(epochEntry0);
+checkpoint.write(epochEntries);
+LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+when(mockLog.logEndOffset()).thenReturn(200L);
+
+RemoteLogSegmentMetadata metadata1 = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024,
+epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED)
+.get(0);
+// overlapping segment

Review Comment:
   Please describe the test case in detail about the overlapping segment and 
how the duplicate entry with the earlier endoffset will not move the 
log-start-offset backwards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes [kafka]

2024-06-11 Thread via GitHub


ableegoldman commented on PR #16294:
URL: https://github.com/apache/kafka/pull/16294#issuecomment-2162088243

   Merged to trunk and cherrypicked to 3.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 23) TaskAssignmentUtils minor changes [kafka]

2024-06-11 Thread via GitHub


ableegoldman merged PR #16294:
URL: https://github.com/apache/kafka/pull/16294


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 23) TaskAssignmentUtils minor changes [kafka]

2024-06-11 Thread via GitHub


ableegoldman commented on code in PR #16294:
URL: https://github.com/apache/kafka/pull/16294#discussion_r1635794717


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -55,7 +56,103 @@ public final class TaskAssignmentUtils {
 private TaskAssignmentUtils() {}
 
 /**
- * Return an {@code AssignmentError} for a task assignment created for an 
application.
+ * A simple config container for necessary parameters and optional 
overrides to apply when
+ * running the active or standby task rack-aware optimizations.
+ */
+public static class RackAwareOptimizationParams {

Review Comment:
   nit: make this final



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -55,7 +56,103 @@ public final class TaskAssignmentUtils {
 private TaskAssignmentUtils() {}
 
 /**
- * Return an {@code AssignmentError} for a task assignment created for an 
application.
+ * A simple config container for necessary parameters and optional 
overrides to apply when
+ * running the active or standby task rack-aware optimizations.
+ */
+public static class RackAwareOptimizationParams {
+private final ApplicationState applicationState;
+private final Optional trafficCostOverride;
+private final Optional nonOverlapCostOverride;
+private final Optional> tasksToOptimize;
+
+private RackAwareOptimizationParams(final ApplicationState 
applicationState,
+   final Optional 
trafficCostOverride,
+   final Optional 
nonOverlapCostOverride,
+   final Optional> 
tasksToOptimize) {

Review Comment:
   nit: indentation is off by one space I think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on code in PR #16250:
URL: https://github.com/apache/kafka/pull/16250#discussion_r1635795430


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2760,9 +2760,19 @@ void handleFailure(Throwable throwable) {
 }, now);
 }
 
-return new DescribeConfigsResult(new 
HashMap<>(nodeFutures.entrySet().stream()
+return new DescribeConfigsResult(
+nodeFutures.entrySet()
+.stream()
 .flatMap(x -> x.getValue().entrySet().stream())
-.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue;
+.collect(Collectors.toMap(
+Map.Entry::getKey,
+Map.Entry::getValue,
+(oldValue, newValue) -> {
+throw new 
IllegalStateException(String.format("Duplicate key for values: %s and %s", 
oldValue, newValue));

Review Comment:
   Sounds good, I will do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-11 Thread via GitHub


muralibasani commented on PR #16255:
URL: https://github.com/apache/kafka/pull/16255#issuecomment-2162073940

   @chia7712 thank you for the great review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on code in PR #16250:
URL: https://github.com/apache/kafka/pull/16250#discussion_r1635784034


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2760,9 +2760,19 @@ void handleFailure(Throwable throwable) {
 }, now);
 }
 
-return new DescribeConfigsResult(new 
HashMap<>(nodeFutures.entrySet().stream()
+return new DescribeConfigsResult(
+nodeFutures.entrySet()
+.stream()
 .flatMap(x -> x.getValue().entrySet().stream())
-.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue;
+.collect(Collectors.toMap(
+Map.Entry::getKey,
+Map.Entry::getValue,
+(oldValue, newValue) -> {
+throw new 
IllegalStateException(String.format("Duplicate key for values: %s and %s", 
oldValue, newValue));

Review Comment:
   Could you please add comments to remind readers that should not happen.
   
   Maybe we can add test which using duplicate `resources` as input for 
`describeConfigs` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on PR #16250:
URL: https://github.com/apache/kafka/pull/16250#issuecomment-2162052602

   Hi @chia7712 , I have make a change based on comments, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16643 Fix chaos modifier [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on PR #15890:
URL: https://github.com/apache/kafka/pull/15890#issuecomment-2162051279

   @gongxuanzhang could you fix the conflicts? Also, please revert the 
unrelated changes. thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-06-11 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16643:
--

Assignee: xuanzhang gong

> Add ModifierOrder checkstyle rule
> -
>
> Key: KAFKA-16643
> URL: https://issues.apache.org/jira/browse/KAFKA-16643
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Greg Harris
>Assignee: xuanzhang gong
>Priority: Minor
>
> Checkstyle offers the ModifierOrder rule: 
> [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
> Kafka violates in a lot of places. We should decide if this is a checkstyle 
> rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-06-11 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854235#comment-17854235
 ] 

Chia-Ping Tsai commented on KAFKA-16643:


{quote}
this is for the modifiers on variables/methods etc (static final abstract etc)
{quote}

just noticed that I misunderstand this jira, sorry :(

I will review the PR filed by [~gongxuanzhang].



> Add ModifierOrder checkstyle rule
> -
>
> Key: KAFKA-16643
> URL: https://issues.apache.org/jira/browse/KAFKA-16643
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Greg Harris
>Priority: Minor
>
> Checkstyle offers the ModifierOrder rule: 
> [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
> Kafka violates in a lot of places. We should decide if this is a checkstyle 
> rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on PR #16255:
URL: https://github.com/apache/kafka/pull/16255#issuecomment-2162042731

   @muralibasani thanks for this nice test!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-11 Thread via GitHub


chia7712 merged PR #16255:
URL: https://github.com/apache/kafka/pull/16255


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15716: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]

2024-06-11 Thread via GitHub


github-actions[bot] commented on PR #15295:
URL: https://github.com/apache/kafka/pull/15295#issuecomment-2162038395

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: log AssignReplicasToDirsRequest dispatch and handling [kafka]

2024-06-11 Thread via GitHub


github-actions[bot] commented on PR #15356:
URL: https://github.com/apache/kafka/pull/15356#issuecomment-2162038337

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16937) Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate

2024-06-11 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16937:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate
> --
>
> Key: KAFKA-16937
> URL: https://issues.apache.org/jira/browse/KAFKA-16937
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> Time#waitObject is implemented by while-loop and it is used by 
> `ProducerMetadata` only. Hence, this jira can include following changes:
> 1. move `Time#waitObject` to  `ProducerMetadata#awaitUpdate`
> 2. ProducerMetadata#awaitUpdate can throw "exact" TimeoutException [0]
> [0] 
> https://github.com/apache/kafka/blob/23fe71d579f84d59ebfe6d5a29e688315cec1285/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1176



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16937) Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate

2024-06-11 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854226#comment-17854226
 ] 

PoAn Yang commented on KAFKA-16937:
---

Hi [~chia7712], I'm interested in this issue. If you're not working on it, may 
I handle it? Thank you.

> Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate
> --
>
> Key: KAFKA-16937
> URL: https://issues.apache.org/jira/browse/KAFKA-16937
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Time#waitObject is implemented by while-loop and it is used by 
> `ProducerMetadata` only. Hence, this jira can include following changes:
> 1. move `Time#waitObject` to  `ProducerMetadata#awaitUpdate`
> 2. ProducerMetadata#awaitUpdate can throw "exact" TimeoutException [0]
> [0] 
> https://github.com/apache/kafka/blob/23fe71d579f84d59ebfe6d5a29e688315cec1285/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1176



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on code in PR #16266:
URL: https://github.com/apache/kafka/pull/16266#discussion_r1635760706


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -30,7 +30,40 @@
  */
 public interface Time {
 
-Time SYSTEM = new SystemTime();
+//  A time implementation that uses the system clock and sleep call.
+//  Use inline implementation to ensure that only one Time#SYSTEM exists 
in a program
+Time SYSTEM = new Time() {
+@Override
+public long milliseconds() {
+return System.currentTimeMillis();
+}
+
+@Override
+public long nanoseconds() {
+return System.nanoTime();
+}
+
+@Override
+public void sleep(long ms) {
+Utils.sleep(ms);
+}
+
+@Override
+public void waitObject(Object obj, Supplier condition, long 
deadlineMs) throws InterruptedException {

Review Comment:
   I open https://issues.apache.org/jira/browse/KAFKA-16937 for it. We can have 
more discussion there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16937) Consider inlineing Time#waitObject to ProducerMetadata#awaitUpdate

2024-06-11 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16937:
--

 Summary: Consider inlineing Time#waitObject to 
ProducerMetadata#awaitUpdate
 Key: KAFKA-16937
 URL: https://issues.apache.org/jira/browse/KAFKA-16937
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Time#waitObject is implemented by while-loop and it is used by 
`ProducerMetadata` only. Hence, this jira can include following changes:

1. move `Time#waitObject` to  `ProducerMetadata#awaitUpdate`
2. ProducerMetadata#awaitUpdate can throw "exact" TimeoutException [0]

[0] 
https://github.com/apache/kafka/blob/23fe71d579f84d59ebfe6d5a29e688315cec1285/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1176



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10787: Apply spotless to `raft` module [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on PR #16278:
URL: https://github.com/apache/kafka/pull/16278#issuecomment-2162011697

   @gongxuanzhang please rebase code to fix spotless check :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR]:Update visibility from public to protected and adjust the order in BuiltInPartitioner [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on code in PR #16277:
URL: https://github.com/apache/kafka/pull/16277#discussion_r1635740247


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -103,13 +103,13 @@ public class RecordAccumulatorTest {
 
 private final Map nodes = Stream.of(node1, 
node2).collect(Collectors.toMap(Node::id, Function.identity()));
 private MetadataSnapshot metadataCache = new MetadataSnapshot(null,
-nodes,
-partMetadatas,
-Collections.emptySet(),
-Collections.emptySet(),
-Collections.emptySet(),
-null,
-Collections.emptyMap());
+nodes,
+partMetadatas,

Review Comment:
   please revert unrelated changes



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -121,12 +121,14 @@ public class RecordAccumulatorTest {
 private final Metrics metrics = new Metrics(time);
 private final long maxBlockTimeMs = 1000;
 private final LogContext logContext = new LogContext();
+private AtomicInteger mockRandom = null;
 
 @BeforeEach void setup() {}
 
 @AfterEach
 public void teardown() {
 this.metrics.close();
+mockRandom = null;

Review Comment:
   this is redundant since the variable is case-level



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -146,17 +148,17 @@ public void testDrainBatches() throws Exception {
 PartitionInfo part3 = MetadataResponse.toPartitionInfo(partMetadata3, 
nodes);
 PartitionInfo part4 = MetadataResponse.toPartitionInfo(partMetadata4, 
nodes);
 Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
-Collections.emptySet(), Collections.emptySet());
+Collections.emptySet(), Collections.emptySet());
 
 metadataCache = new MetadataSnapshot(null,
-nodes,
-partMetadatas,
-Collections.emptySet(),
-Collections.emptySet(),
-Collections.emptySet(),
-null,
-Collections.emptyMap(),
-cluster);
+nodes,
+partMetadatas,
+Collections.emptySet(),
+Collections.emptySet(),

Review Comment:
   ditto



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1737,30 +1735,48 @@ private RecordAccumulator 
createTestRecordAccumulator(int deliveryTimeoutMs, int
  * Return a test RecordAccumulator instance
  */
 private RecordAccumulator createTestRecordAccumulator(
-TransactionManager txnManager,
-int deliveryTimeoutMs,
-int batchSize,
-long totalSize,
-Compression compression,
-int lingerMs
+TransactionManager txnManager,
+int deliveryTimeoutMs,
+int batchSize,
+long totalSize,
+Compression compression,
+int lingerMs
 ) {
 long retryBackoffMs = 100L;
 long retryBackoffMaxMs = 1000L;
 String metricGrpName = "producer-metrics";
 
 return new RecordAccumulator(
-logContext,
-batchSize,
-compression,
-lingerMs,
-retryBackoffMs,
-retryBackoffMaxMs,
-deliveryTimeoutMs,
-metrics,
-metricGrpName,
-time,
-new ApiVersions(),
-txnManager,
-new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
+logContext,
+batchSize,
+compression,
+lingerMs,
+retryBackoffMs,
+retryBackoffMaxMs,
+deliveryTimeoutMs,
+metrics,
+metricGrpName,
+time,
+new ApiVersions(),
+txnManager,
+new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName)) {
+@Override
+protected BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic,
+  int 
stickyBatchSize) {
+return new MockRandomBuiltInPartitioner(logContext, topic, 
stickyBatchSize);
+}
+};
+}
+
+private class MockRandomBuiltInPartitioner extends BuiltInPartitioner {

Review Comment:
   it can be static class, and it would be great to unify the naming -> 
`SequentialPartitioner`



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -146,17 +148,17 @@ public void testDrainBatches() throws Exception {
 PartitionInfo part3 = 

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2161999580

   Hi @chia7712, I have do some small changes based on your feedback, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-11 Thread via GitHub


dujian0068 commented on code in PR #16266:
URL: https://github.com/apache/kafka/pull/16266#discussion_r1635735601


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -30,7 +30,40 @@
  */
 public interface Time {
 
-Time SYSTEM = new SystemTime();
+//  A time implementation that uses the system clock and sleep call.
+//  Use inline implementation to ensure that only one Time#SYSTEM exists 
in a program
+Time SYSTEM = new Time() {

Review Comment:
   Thank you
   I will modify `SystemTime` to be package private and provide singleton access



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1635735059


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,153 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {

Review Comment:
   Oops, I will remove it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1635734783


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,153 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+validateEmptyPartition(record, emptyPartitionIndex);
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+recordCount++;
+}
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsByPartition() {
+List topics = Arrays.asList("topic1", "topic2");
+int recordSize = 3;
+int partitionSize = 5;
+int emptyPartitionIndex = 2;
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+for (int partition = 0; partition < partitionSize; partition++) {
+TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+List> records = 
consumerRecords.records(topicPartition);
+
+if (partition == emptyPartitionIndex) {
+assertTrue(records.isEmpty());
+} else {
+assertEquals(recordSize, records.size());
+for (int i = 0; i < records.size(); i++) {
+ConsumerRecord record = 
records.get(i);
+validateRecordPayload(topic, record, partition, i, 
recordSize);
+}
+}
+}
 }
-assertEquals(2, c);
+}
+
+@Test
+public void testRecordsByNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecordsByTopic() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+

Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-11 Thread via GitHub


dujian0068 commented on code in PR #16266:
URL: https://github.com/apache/kafka/pull/16266#discussion_r1635734496


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -30,7 +30,40 @@
  */
 public interface Time {
 
-Time SYSTEM = new SystemTime();
+//  A time implementation that uses the system clock and sleep call.
+//  Use inline implementation to ensure that only one Time#SYSTEM exists 
in a program
+Time SYSTEM = new Time() {
+@Override
+public long milliseconds() {
+return System.currentTimeMillis();
+}
+
+@Override
+public long nanoseconds() {
+return System.nanoTime();
+}
+
+@Override
+public void sleep(long ms) {
+Utils.sleep(ms);
+}
+
+@Override
+public void waitObject(Object obj, Supplier condition, long 
deadlineMs) throws InterruptedException {

Review Comment:
   Thank you very much for your reply, but I don't fully understand which 
method you want to refactor.
   The developer of `Time#waitObject()` seems to want to provide a unified 
synchronization method based on `Time`, but it doesn't seem to need to be 
refactored.
   But `ProducerMetadata#awaitUpdate` can remove the `synchronized` keyword



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-11 Thread via GitHub


dujian0068 commented on code in PR #16266:
URL: https://github.com/apache/kafka/pull/16266#discussion_r1635729325


##
clients/src/test/java/org/apache/kafka/common/utils/TimeTest.java:
##
@@ -80,4 +82,11 @@ public void testWaitObjectConditionSatisfied() throws 
InterruptedException {
 assertTrue(time.milliseconds() < deadlineMs);
 assertNull(caughtException.get());
 }
+
+@Test
+public void testOnlyOneSystemTime() {

Review Comment:
   Thanks for the reminder, I will delete it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16879 SystemTime should use singleton mode [kafka]

2024-06-11 Thread via GitHub


dujian0068 commented on code in PR #16266:
URL: https://github.com/apache/kafka/pull/16266#discussion_r1635728504


##
trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:
##
@@ -60,7 +60,7 @@
 
 public class SustainedConnectionWorker implements TaskWorker {
 private static final Logger log = 
LoggerFactory.getLogger(SustainedConnectionWorker.class);
-private static final SystemTime SYSTEM_TIME = new SystemTime();
+private static final Time SYSTEM_TIME = Time.SYSTEM;

Review Comment:
   You are right,  
   After updating `SystemTime` to singleton mode, many class attributes `time` 
can be modified inline, but I am not sure whether to modify it because it is a 
large-scale change to modify many files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR]:Update visibility from public to protected and adjust the order in BuiltInPartitioner [kafka]

2024-06-11 Thread via GitHub


gongxuanzhang commented on code in PR #16277:
URL: https://github.com/apache/kafka/pull/16277#discussion_r1635719559


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1763,4 +1747,65 @@ private RecordAccumulator createTestRecordAccumulator(
 txnManager,
 new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
 }
+
+private class SequentialRecordAccumulator extends RecordAccumulator {

Review Comment:
   This is an excellent suggestion!, perfectly reproducing the "global 
modification" in the previous code.
   I changed PR, plz review it @chia7712 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to `raft` module [kafka]

2024-06-11 Thread via GitHub


gongxuanzhang commented on PR #16278:
URL: https://github.com/apache/kafka/pull/16278#issuecomment-2161904036

   > @gongxuanzhang could you please the conflicts? BTW, it would be easy to 
apply the spotless to small/inactive modules first. For example: 
`log4j-appender`, `trogdor`, `jmh-benchmarks`, `examples`, `shell`, `generator`
   
   Right,next PR will apply all of these modules.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module (part 1) [kafka]

2024-06-11 Thread via GitHub


m1a2st commented on PR #16253:
URL: https://github.com/apache/kafka/pull/16253#issuecomment-2161885165

   @chia7712, Thanks for your comment, I remove all 
`@RunWith(MockitoJUnitRunner.StrictStubs.class)`, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-11 Thread via GitHub


satishd merged PR #15820:
URL: https://github.com/apache/kafka/pull/15820


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-11 Thread via GitHub


satishd commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2161862623

   There are a few unrelated test failures, merging it to trunk and 3.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-11 Thread via GitHub


satishd commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2161861876

   Thanks @abhijeetk88 for addressing the review comments. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]

2024-06-11 Thread via GitHub


mjsax commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r1635630970


##
docs/streams/upgrade-guide.html:
##
@@ -134,11 +134,44 @@ <
 
 
 Streams API changes in 3.8.0
+
+
+Kafka Streams now supports customizable task assignment strategies via 
the `task.assignor.class` configuration.
+The configuration can be set to the fully qualified class name of a 
custom task assignor implementation
+that has to extend the new 
`org.apache.kafka.streams.processor.assignment.TaskAssignor` interface.
+
+The new configuration also allows users to bring back the behavior of 
the old task assignor
+`StickyTaskAssignor` that was used before the introduction 
`HighAvailabilityTaskAssignor`.
+If no custom task assignor is configured, the default task assignor 
`HighAvailabilityTaskAssignor` is used.
+
+This change also removes the internal config 
`internal.task.assignor.class` that was used for the same
+purpose. If you were using this config, you should switch to using 
`task.assignor.class` instead.
+
+For more details, see the public interface section of
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams;>KIP-924.
+
+
 
 The Processor API now support so-called read-only state stores, added 
via 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores;>KIP-813.
-   These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
-   simlar to KTables with source-topic optimization enabled.
+These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
+similar to KTables with source-topic optimization enabled.
+
+
+
+To improve detection of leaked state store iterators, we added new 
store-level metrics to track the number and
+age of open iterators. The new metrics are `num-open-iterators`, 
`iterator-duration-avg`, `iterator-duration-max`
+and `oldest-iterator-open-since-ms`. These metrics are available for 
all state stores, including RocksDB,
+in-memory, and custom stores. More details can be found in
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks;>KIP-989.
+
+
+
+To facilitate the implementation of dead-letter queues in Kafka 
Streams,
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception;>KIP-1036

Review Comment:
   > KIP-813 is there.
   
   Ah. I did do my job when merging KIP-813 PR :) #proudOfMyself :)  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]

2024-06-11 Thread via GitHub


mjsax commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r1635630058


##
docs/streams/developer-guide/config-streams.html:
##
@@ -1017,6 +1022,20 @@ state.dir
   
 
+
+  task.assignor.class
+  
+A task assignor class or class name implementing the
+  
org.apache.kafka.streams.processor.assignment.TaskAssignor 
interface". Defaults to the default
+  high-availability task assignor class. One possible alternative 
implementation provided in Apache Kafka is
+  
org.apache.kafka.streams.processor.assignm\\ent.StickyTaskAssignor,
 which was the default behavior

Review Comment:
   ```suggestion
 
org.apache.kafka.streams.processor.assignment.StickyTaskAssignor, 
which was the default behavior
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]

2024-06-11 Thread via GitHub


mjsax commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r1635629467


##
docs/streams/developer-guide/config-streams.html:
##
@@ -1017,6 +1022,20 @@ state.dir
   
 
+
+  task.assignor.class
+  
+A task assignor class or class name implementing the
+  
org.apache.kafka.streams.processor.assignment.TaskAssignor 
interface". Defaults to the default

Review Comment:
   ```suggestion
 
org.apache.kafka.streams.processor.assignment.TaskAssignor 
interface. Defaults to the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]

2024-06-11 Thread via GitHub


mjsax commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r1635629467


##
docs/streams/developer-guide/config-streams.html:
##
@@ -1017,6 +1022,20 @@ state.dir
   
 
+
+  task.assignor.class
+  
+A task assignor class or class name implementing the
+  
org.apache.kafka.streams.processor.assignment.TaskAssignor 
interface". Defaults to the default

Review Comment:
   ```suggestion
 
org.apache.kafka.streams.processor.assignment.TaskAssignor 
interface. Defaults to the default
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]

2024-06-11 Thread via GitHub


mjsax commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r1635628879


##
docs/streams/developer-guide/config-streams.html:
##
@@ -333,7 +334,6 @@ 

Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]

2024-06-11 Thread via GitHub


mjsax commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r163562


##
docs/streams/upgrade-guide.html:
##
@@ -134,11 +134,44 @@ <
 
 
 Streams API changes in 3.8.0
+
+
+Kafka Streams now supports customizable task assignment strategies via 
the `task.assignor.class` configuration.
+The configuration can be set to the fully qualified class name of a 
custom task assignor implementation
+that has to extend the new 
`org.apache.kafka.streams.processor.assignment.TaskAssignor` interface.
+
+The new configuration also allows users to bring back the behavior of 
the old task assignor
+`StickyTaskAssignor` that was used before the introduction 
`HighAvailabilityTaskAssignor`.
+If no custom task assignor is configured, the default task assignor 
`HighAvailabilityTaskAssignor` is used.
+
+This change also removes the internal config 
`internal.task.assignor.class` that was used for the same
+purpose. If you were using this config, you should switch to using 
`task.assignor.class` instead.
+
+For more details, see the public interface section of
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams;>KIP-924.
+
+
 
 The Processor API now support so-called read-only state stores, added 
via 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores;>KIP-813.
-   These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
-   simlar to KTables with source-topic optimization enabled.
+These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
+similar to KTables with source-topic optimization enabled.
+
+
+
+To improve detection of leaked state store iterators, we added new 
store-level metrics to track the number and
+age of open iterators. The new metrics are `num-open-iterators`, 
`iterator-duration-avg`, `iterator-duration-max`
+and `oldest-iterator-open-since-ms`. These metrics are available for 
all state stores, including RocksDB,
+in-memory, and custom stores. More details can be found in
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks;>KIP-989.
+
+
+
+To facilitate the implementation of dead-letter queues in Kafka 
Streams,
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception;>KIP-1036

Review Comment:
   I don't think is relevant to KS users -- cannot see/imagine how? KS uses 
don't see the underlying consumer, but KS manages is for them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-11 Thread via GitHub


m1a2st commented on PR #16244:
URL: https://github.com/apache/kafka/pull/16244#issuecomment-2161833144

   @chia7712, Thanks for your comment, I push a new patch for this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add time out in assertFutureThrows [kafka]

2024-06-11 Thread via GitHub


showuon commented on code in PR #16264:
URL: https://github.com/apache/kafka/pull/16264#discussion_r1635625857


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -556,10 +558,22 @@ public static Set 
generateRandomTopicPartitions(int numTopic, in
  * @return The caught exception cause
  */
 public static  T assertFutureThrows(Future future, 
Class exceptionCauseClass) {
-ExecutionException exception = assertThrows(ExecutionException.class, 
future::get);
-assertInstanceOf(exceptionCauseClass, exception.getCause(),
-"Unexpected exception cause " + exception.getCause());
-return exceptionCauseClass.cast(exception.getCause());
+try {
+future.get(5, TimeUnit.SECONDS);
+fail("expected to throw ExecutionException...");
+} catch (TimeoutException e) {
+fail("timeout waiting");
+return null;
+} catch (ExecutionException e) {
+ExecutionException exception = 
assertThrows(ExecutionException.class, future::get);
+assertInstanceOf(exceptionCauseClass, exception.getCause(),
+"Unexpected exception cause " + exception.getCause());
+return exceptionCauseClass.cast(exception.getCause());
+} catch (InterruptedException e) {
+fail("Unexpected exception cause" + e.getCause());
+return null;
+}
+return null;

Review Comment:
   1. Please put the error message clearly. Do you think users understand what 
this error message mean when reading it: `expected to throw 
ExecutionException...`, `timeout waiting`? I wrote it like that is to let you 
have a start. You can check other places to learn how to put a clear error 
message.
   2. when `fail()` method invoked, do you know what will happen? 
   3. The method is `assertFutureThrows`, so what should we do if no exception 
thrown? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests [kafka]

2024-06-11 Thread via GitHub


gharris1727 commented on code in PR #16288:
URL: https://github.com/apache/kafka/pull/16288#discussion_r1635577975


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java:
##
@@ -169,7 +164,6 @@ public void 
testFailToStartWhenInternalTopicsAreNotCompacted() throws Interrupte
 // Start the brokers but not Connect
 log.info("Starting {} Kafka brokers, but no Connect workers yet", 
numBrokers);
 connect.start();

Review Comment:
   For downstream users intentionally giving the embedded connect bad configs, 
they will need to alter the test anyway, so I'd prefer that they follow the 
pattern used by upstream.
   
   I don't think that anyone should really be relying on this behavior, 
otherwise I would make an argument for start() remaining non-blocking (which i 
definitely don't want).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15600) KIP-990: Capability to PAUSE Tasks on DeserializationException

2024-06-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15600.
-
Resolution: Won't Fix

> KIP-990: Capability to PAUSE Tasks on DeserializationException
> --
>
> Key: KAFKA-15600
> URL: https://issues.apache.org/jira/browse/KAFKA-15600
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: kip
>
> Presently, Kafka Streams provides users with two options for handling a 
> {{DeserializationException}}  via the {{DeserializationExceptionHandler}}  
> interface:
>  # {{FAIL}} - throw an Exception that causes the stream thread to fail. This 
> will either cause the whole application instance to exit, or the stream 
> thread will be replaced and restarted. Either way, the failed {{Task}} will 
> end up being resumed, either by the current instance or after being 
> rebalanced to another, causing a cascading failure until a user intervenes to 
> address the problem.
>  # {{CONTINUE}} - discard the record and continue processing with the next 
> record. This can cause data loss if the record triggering the 
> {{DeserializationException}} should be considered a valid record. This can 
> happen if an upstream producer changes the record schema in a way that is 
> incompatible with the streams application, or if there is a bug in the 
> {{Deserializer}}  (for example, failing to handle a valid edge-case).
> The user can currently choose between data loss, or a cascading failure that 
> usually causes all processing to slowly grind to a halt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15045: (KIP-924 pt. 23) TaskAssignmentUtils minor changes [kafka]

2024-06-11 Thread via GitHub


apourchet opened a new pull request, #16294:
URL: https://github.com/apache/kafka/pull/16294

   We now provide a way to more easily customize the rack aware optimizations 
that we provide by way of a configuration class called 
RackAwareOptimizationParams.
   
   We also simplified the APIs for the optimizeXYZ utility functions since they 
were mutating the inputs anyway.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-8088) Deprecate `WindowStoreIterator` interface

2024-06-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8088:
---
Fix Version/s: (was: 4.0.0)

> Deprecate `WindowStoreIterator` interface
> -
>
> Key: KAFKA-8088
> URL: https://issues.apache.org/jira/browse/KAFKA-8088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
>
> The `WindowStore` interface has multiple methods to fetch() data. However, 
> the return types are mixed up. Two methods return `WindowStoreIterator` while 
> all others return `KeyValueIterator`.
> We should align the return types and replace `WindowStoreIterator` with 
> `KeyValueIterator`. For backward compatibility reasons we can only deprecate 
> the interface for now and remove it only later.
> KIP-439: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-439%3A+Deprecate+Interface+WindowStoreIterator]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Logging for new topic auto-creation for Producers [kafka]

2024-06-11 Thread via GitHub


jayim243 opened a new pull request, #16293:
URL: https://github.com/apache/kafka/pull/16293

   This pull request introduces logging functionality for the automatic 
creation of new topics in the Kafka Producer. Adds logging capabilities to the 
Kafka Producer when new topics are automatically created. 
   
   This contribution is my original work and I license the work to the project 
under the project's open source license.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on code in PR #16291:
URL: https://github.com/apache/kafka/pull/16291#discussion_r1635556134


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -122,6 +123,48 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+CommitRequestManager commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+commitRequestManager.onMemberEpochUpdated(Optional.of(1), 
Optional.empty());
+Set requestedPartitions = Collections.singleton(new 
TopicPartition("topic-1", 1));
+
+CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = 
commitRequestManager.new OffsetFetchRequestState(
+requestedPartitions,
+retryBackoffMs,
+retryBackoffMaxMs,
+1000,
+memberInfo);

Review Comment:
   in the line#142 - The `memberInfo`  updated by `onMemberEpochUpdated` is the 
inner variable of `commitRequestManager`. Hence, this `memberInfo` used to 
create `OffsetFetchRequestState` is NOT updated by `onMemberEpochUpdated`
   
   If we want to complete the test, maybe we can remove `MemberInfo` from 
constructor of `OffsetFetchRequestState`. After all, both 
`RetriableRequestState` and `OffsetFetchRequestState` are the inner class of 
`CommitRequestManager`, so it is valid to share the `MemberInfo` between those 
classes.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -122,6 +123,48 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+CommitRequestManager commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+commitRequestManager.onMemberEpochUpdated(Optional.of(1), 
Optional.empty());
+Set requestedPartitions = Collections.singleton(new 
TopicPartition("topic-1", 1));
+
+CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = 
commitRequestManager.new OffsetFetchRequestState(
+requestedPartitions,
+retryBackoffMs,
+retryBackoffMaxMs,
+1000,
+memberInfo);
+
+TimedRequestState timedRequestState = new TimedRequestState(
+logContext,
+"CommitRequestManager",
+retryBackoffMs,
+retryBackoffMaxMs,
+TimedRequestState.deadlineTimer(time, 0)
+);
+
+String target = timedRequestState.toStringBase() +
+", " + memberInfo +
+", requestedPartitions=" + 
offsetFetchRequestState.requestedPartitions;
+
+assertDoesNotThrow(timedRequestState::toString);
+assertEquals(target, offsetFetchRequestState.toStringBase());
+}

Review Comment:
   Maybe we can add `assertFalse(target.contains("Optional"));` to make sure we 
unwrap the optional variables?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-06-11 Thread via GitHub


CalvinConfluent commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1635552897


##
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java:
##
@@ -371,7 +371,7 @@ public void 
testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
 setPartitionEpoch(0);
 List exceptions = new ArrayList<>();
 ImageWriterOptions options = new ImageWriterOptions.Builder().
-setMetadataVersion(MetadataVersion.IBP_3_8_IV0).
+setMetadataVersion(MetadataVersion.IBP_3_8_IV1).

Review Comment:
   Yes, it requires IBP_4_0_IV0



##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -394,7 +394,7 @@ public void testUncleanShutdownBroker() throws Throwable {
 new BrokerRegistrationRequestData().
 setBrokerId(brokerId).
 setClusterId(active.clusterId()).
-
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV0)).
+
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV1)).

Review Comment:
   Yes, it requires IBP_4_0_IV0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16570 FenceProducers API returns "unexpected error" when succes… [kafka]

2024-06-11 Thread via GitHub


edoardocomar commented on PR #16229:
URL: https://github.com/apache/kafka/pull/16229#issuecomment-2161709758

   @jolshan ... gentle nudge for approval or objections ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests [kafka]

2024-06-11 Thread via GitHub


C0urante commented on code in PR #16288:
URL: https://github.com/apache/kafka/pull/16288#discussion_r1635549634


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java:
##
@@ -169,7 +164,6 @@ public void 
testFailToStartWhenInternalTopicsAreNotCompacted() throws Interrupte
 // Start the brokers but not Connect
 log.info("Starting {} Kafka brokers, but no Connect workers yet", 
numBrokers);
 connect.start();

Review Comment:
   I'm happy to rework 
`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers`, but just in case 
someone downstream is using `start` in a way that relies on existing behavior, 
I'd also like to keep in the non-blocking method, even if we don't use it in 
any of our tests. I know we're not under any obligation to but it's such a 
small change that the negligible maintenance burden seems worth the tradeoff. 
Does that work for you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16000) Migrate MembershipManagerImplTest away from ConsumerTestBuilder

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16000:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Migrate MembershipManagerImplTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16000
> URL: https://issues.apache.org/jira/browse/KAFKA-16000
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15867:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Should ConsumerNetworkThread wrap the exception and notify the polling thread?
> --
>
> Key: KAFKA-15867
> URL: https://issues.apache.org/jira/browse/KAFKA-15867
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor, events
> Fix For: 3.9.0
>
>
> The ConsumerNetworkThread runs a tight loop infinitely.  However, when 
> encountering an unexpected exception, it logs an error and continues.
>  
> I think this might not be ideal because user can run blind for a long time 
> before discovering there's something wrong with the code; so I believe we 
> should propagate the throwable back to the polling thread. 
>  
> cc [~lucasbru] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16022:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> AsyncKafkaConsumer sometimes complains “No current assignment for partition 
> {}”
> ---
>
> Key: KAFKA-16022
> URL: https://issues.apache.org/jira/browse/KAFKA-16022
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> This seems to be a timing issue that before the member receives any 
> assignment from the coordinator, the fetcher will try to find the current 
> position causing "No current assignment for partition {}".  This creates a 
> small amount of noise to the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, timeout, unit-tests
> Fix For: 3.9.0
>
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15551:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, events
> Fix For: 3.9.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15321) Document consumer group member state machine

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15321:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Document consumer group member state machine
> 
>
> Key: KAFKA-15321
> URL: https://issues.apache.org/jira/browse/KAFKA-15321
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: kip-848-client-support, reconciliation
> Fix For: 3.9.0
>
>
> We need to first document the new consumer group member state machine. What 
> are the different states and what are the transitions?
> See [~pnee]'s notes: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design]
> *_Don’t forget to include diagrams for clarity!_*
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15767) Redesign TransactionManager to avoid use of ThreadLocal

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15767:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Redesign TransactionManager to avoid use of ThreadLocal
> ---
>
> Key: KAFKA-15767
> URL: https://issues.apache.org/jira/browse/KAFKA-15767
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Affects Versions: 3.6.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
> Fix For: 3.9.0
>
>
> A {{TransactionManager}} instance is created by the {{KafkaProducer}} and 
> shared with the {{Sender}} thread. The {{TransactionManager}} has internal 
> states through which it transitions as part of its initialization, 
> transaction management, shutdown, etc. It contains logic to ensure that those 
> state transitions are valid, such that when an invalid transition is 
> attempted, it is handled appropriately. 
> The issue is, the definition of "handled appropriately" depends on which 
> thread is making the API call that is attempting an invalid transition. The 
> application thread expects that the invalid transition will generate an 
> exception. However, the sender thread expects that the invalid transition 
> will "poison" the entire {{TransactionManager}} instance.
> So as part of the implementation of KAFKA-14831, we needed a way to change 
> logic in the {{TransactionManager}} on a per-thread basis, so a 
> {{ThreadLocal}} instance variable was added to the {{TransactionManager}} to 
> keep track of this. However, the use of ThreadLocal instance variables is 
> generally discouraged because of their tendency for memory leaks, shared 
> state across multiple threads, and not working with virtual threads.
> The initial implementation attempt of KAFKA-14831 used a context object, 
> passed in to each method, to affect the logic. However, the number of methods 
> that needed to be changed to accept this new context object grew until most 
> of the methods in {{TransactionManager}} needed to be updated. Thus all the 
> affected call sites needed to be updated, resulting in a much larger change 
> than anticipated.
> The task here is to remove the use of the {{ThreadLocal}} instance variable, 
> let the application thread and {{Sender}} thread keep their desired behavior, 
> but keep the change to a minimum.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16818) Move event processing-related tests from ConsumerNetworkThreadTest to ApplicationEventProcessorTest

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16818:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Move event processing-related tests from ConsumerNetworkThreadTest to 
> ApplicationEventProcessorTest
> ---
>
> Key: KAFKA-16818
> URL: https://issues.apache.org/jira/browse/KAFKA-16818
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, unit tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> The {{ConsumerNetworkThreadTest}} currently has a number of tests which do 
> the following:
>  # Add event of type _T_ to the event queue
>  # Call {{ConsumerNetworkThread.runOnce()}} to dequeue the events and call 
> {{ApplicationEventProcessor.process()}}
>  # Verify that the appropriate {{ApplicationEventProcessor}} process method 
> was invoked for the event
> Those types of tests should be moved to {{{}ApplicationEventProcessorTest{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15773) Group protocol configuration should be validated

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15773:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15173:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Consumer event queues should be bounded
> ---
>
> Key: KAFKA-15173
> URL: https://issues.apache.org/jira/browse/KAFKA-15173
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, events
> Fix For: 3.9.0
>
>
> The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
> facilitate message passing between the application thread and the background 
> thread.  The current implementation is boundless, which can potentially cause 
> OOM and other performance-related issues.
> I think the queues need a finite bound, and we need to decide how to handle 
> the situation when the bound is reached.  In particular, I would like to 
> answer these questions:
>  
>  # What should the upper limit be for both queues: Can this be a 
> configurable, memory-based bound? Or just an arbitrary number of events as 
> the bound.
>  # What should happen when the application event queue is filled up?  It 
> seems like we should introduce a new exception type and notify the user that 
> the consumer is full.
>  # What should happen when the background event queue is filled up? This 
> seems less likely to happen, but I imagine it could happen when the user 
> stops polling the consumer, causing the queue to be filled.
>  # Is it necessary to introduce a public configuration for the queue? I think 
> initially we would select an arbitrary constant number and see the community 
> feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16233) Review auto-commit continuously committing when no progress

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16233:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Review auto-commit continuously committing when no progress 
> 
>
> Key: KAFKA-16233
> URL: https://issues.apache.org/jira/browse/KAFKA-16233
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> When auto-commit is enabled, the consumer (legacy and new) will continuously 
> send commit requests with the current positions, even if no progress is made 
> and positions remain unchanged. We could consider if this is really needed 
> for some reason, or if we could improve it and just send auto-commit on the 
> interval if positions have moved, avoiding sending repeatedly the same commit 
> request.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15999:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
> -
>
> Key: KAFKA-15999
> URL: https://issues.apache.org/jira/browse/KAFKA-15999
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15320) Document event queueing patterns

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15320:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Document event queueing patterns
> 
>
> Key: KAFKA-15320
> URL: https://issues.apache.org/jira/browse/KAFKA-15320
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, events
> Fix For: 3.9.0
>
>
> We need to first document the event enqueuing patterns in the 
> PrototypeAsyncConsumer. As part of this task, determine if it’s 
> necessary/beneficial to _conditionally_ add events and/or coalesce any 
> duplicate events in the queue.
> _Don’t forget to include diagrams for clarity!_
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16272:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.9.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors and new metrics

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16142:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Update metrics documentation for errors and new metrics
> ---
>
> Key: KAFKA-16142
> URL: https://issues.apache.org/jira/browse/KAFKA-16142
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, documentation, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.9.0
>
>
> What is the documentation to update here? AFAIUI, we're not changing or 
> adding metrics for 3.8.0, so there wouldn't be anything to add/change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16111:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Major
>  Labels: callback, consumer-threading-refactor, integration-tests
> Fix For: 3.9.0
>
>
> There is justified concern that the new threading model may not play well 
> with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide 
> some assurance that it will support complicated patterns.
>  # Design and implement test scenarios
>  # Update and document any design changes with the callback sub-system where 
> needed
>  # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by 
> said design



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16558:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Implement HeartbeatRequestState.toStringBase()
> --
>
> Key: KAFKA-16558
> URL: https://issues.apache.org/jira/browse/KAFKA-16558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.9.0
>
>
> The inner class {{HeartbeatRequestState}} does not override the 
> {{toStringBase()}} method. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15835) Group commit/callbacks triggering logic

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15835:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Group commit/callbacks triggering logic
> ---
>
> Key: KAFKA-15835
> URL: https://issues.apache.org/jira/browse/KAFKA-15835
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, reconciliation
> Fix For: 3.9.0
>
>
> The new consumer reconciliation logic triggers a commit request, revocation 
> callback and assignment callbacks sequentially to ensure that they are 
> executed in that order. This means that we could require multiple iterations 
> of the poll loop to complete reconciling an assignment. 
> We could consider triggering them all together, to be executed in the same 
> poll iteration, while still making sure that they are executed in the right 
> order. Note that the sequence sometimes should not block on failures (ex. if 
> commit fails revocation proceeds anyways), and other times it does block (if 
> revocation callbacks fail onPartitionsAssigned is not called).
> As part of this task, review the time boundaries for the commit request 
> issued when the assignment changes. It will be effectively time bounded by 
> the rebalance timeout enforced by the broker, so initial approach is to use 
> the same rebalance timeout as boundary on the client. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16024) SaslPlaintextConsumerTest#testCoordinatorFailover is flaky

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16024:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> SaslPlaintextConsumerTest#testCoordinatorFailover is flaky
> --
>
> Key: KAFKA-16024
> URL: https://issues.apache.org/jira/browse/KAFKA-16024
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: flaky-test, integration-tests
> Fix For: 3.9.0
>
>
> The test is flaky with the async consumer as we are observing
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Failed to observe commit callback before 
> timeout{code}
> I was not able to replicate this on my local machine easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.9.0
>
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16301) Review fenced member unsubscribe/subscribe callbacks interaction

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16301:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Review fenced member unsubscribe/subscribe callbacks interaction
> 
>
> Key: KAFKA-16301
> URL: https://issues.apache.org/jira/browse/KAFKA-16301
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> When a member gets fenced, it triggers the onPartitionsLost callback if any, 
> and then rejoins the group. If while the callback completes the member 
> attempts to leave the group (ex. unsubscribe), the leave operation detects 
> that the member is already removed from the group (fenced), and just aligns 
> the client state with the current broker state, and marks the client as 
> UNSUBSCRIBED (client side state for not in group). 
> This means that the member could attempt to rejoin the group if the user 
> calls subscribe, get an assignment, and trigger onPartitionsAssigned, when 
> maybe the onPartitionsLost hasn't completed.
> This approach keeps the client state machine simple given that it does not 
> need to block the new member (it will effectively be a new member because the 
> old one got fenced). The new member could rejoin, get an assignment and make 
> progress. Downside is that it would potentially allow for overlapped callback 
> executions (lost and assign) in the above edge case, which is not the 
> behaviour in the old coordinator. Review and validate. Alternative would 
> definitely require more complex logic on the client to ensure that we do not 
> allow a new member to rejoin until the fenced one completes the callback



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15954:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Review minimal effort approach on consumer last heartbeat on unsubscribe
> 
>
> Key: KAFKA-15954
> URL: https://issues.apache.org/jira/browse/KAFKA-15954
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, reconciliation
> Fix For: 3.9.0
>
>
> Currently the legacy and new consumer follows a minimal effort approach when 
> sending a leave group (legacy) or last heartbeat request (new consumer). The 
> request is sent without waiting/handling any response. This behaviour applies 
> when the consumer is being closed or when it unsubscribes.
> For the case when the consumer is being closed, (which is a "terminal" 
> state), it makes sense to just follow a minimal effort approach for 
> "properly" leaving the group. But for the case of unsubscribe, it would maybe 
> make sense to put a little more effort in making sure that the last heartbeat 
> is sent and received by the broker. Note that unsubscribe could a temporary 
> state, where the consumer might want to re-join the group at any time. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16623:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> KafkaAsyncConsumer system tests warn about revoking partitions that weren't 
> previously assigned
> ---
>
> Key: KAFKA-16623
> URL: https://issues.apache.org/jira/browse/KAFKA-16623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.9.0
>
>
> When running system tests for the KafkaAsyncConsumer, we occasionally see 
> this warning:
> {noformat}
>   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
> self.run()
>   File "/usr/lib/python3.7/threading.py", line 865, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 38, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 304, in _worker
> handler.handle_partitions_revoked(event, node, self.logger)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 163, in handle_partitions_revoked
> (tp, node.account.hostname)
> AssertionError: Topic partition TopicPartition(topic='test_topic', 
> partition=0) cannot be revoked from worker20 as it was not previously 
> assigned to that consumer
> {noformat}
> In test_fencing_static_consumer, there are two sets of consumers that use 
> group instance IDs: the initial set and the "conflict" set. It appears that 
> one of the "conflicting" consumers hijacks the partition ownership from the 
> coordinator's perspective when the initial consumer leaves the group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16255) AsyncKafkaConsumer should not use partition.assignment.strategy

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16255:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> AsyncKafkaConsumer should not use partition.assignment.strategy
> ---
>
> Key: KAFKA-16255
> URL: https://issues.apache.org/jira/browse/KAFKA-16255
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> The {{partition.assignment.strategy}} configuration is used to specify a list 
> of zero or more {{ConsumerPartitionAssignor}} instances. However, that 
> interface is not applicable for the KIP-848-based protocol on top of which 
> {{AsyncKafkaConsumer}} is built. Therefore, the use of 
> {{ConsumerPartitionAssignor}} is inappropriate and should be removed from 
> {{{}AsyncKafkaConsumer{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15615:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: appchemist
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher
> Fix For: 3.9.0
>
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15556:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15553) Review consumer positions update

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15553:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Review consumer positions update
> 
>
> Key: KAFKA-15553
> URL: https://issues.apache.org/jira/browse/KAFKA-15553
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, position
> Fix For: 3.9.0
>
>
> From the existing comment: If there are any partitions which do not have a 
> valid position and are not awaiting reset, then we need to fetch committed 
> offsets.
> In the async consumer: I wonder if it would make sense to refresh the 
> position on the event loop continuously.
> The logic to refresh offsets in the poll loop is quite fragile and works 
> largely by side-effects of the code that it calls. For example, the behaviour 
> of the "cached" value is really not that straightforward and simply reading 
> the cached value is not sufficient to start consuming data in all cases.
> This area needs a bit of a refactor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16001:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16001
> URL: https://issues.apache.org/jira/browse/KAFKA-16001
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, unit-tests
> Fix For: 3.9.0
>
>
> We should:
>  # Remove spy calls to the dependencies
>  # Remove ConsumerNetworkThreadTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14945:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
>  Labels: kip
> Fix For: 3.9.0
>
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16557) Fix OffsetFetchRequestState.toString()

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16557:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Fix OffsetFetchRequestState.toString()
> --
>
> Key: KAFKA-16557
> URL: https://issues.apache.org/jira/browse/KAFKA-16557
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.9.0
>
>
> The code incorrectly overrides the {{toString()}} method instead of 
> overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16444) Run KIP-848 unit tests under code coverage

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16444:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Run KIP-848 unit tests under code coverage
> --
>
> Key: KAFKA-16444
> URL: https://issues.apache.org/jira/browse/KAFKA-16444
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15588:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Purge the unsent offset commits/fetches when the member is fenced/failed
> 
>
> Key: KAFKA-15588
> URL: https://issues.apache.org/jira/browse/KAFKA-15588
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, reconciliation
> Fix For: 3.9.0
>
>
> When the member is fenced/failed, we should purge the inflight offset commits 
> and fetches.  HeartbeatRequestManager should be able to handle this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16642:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Update KafkaConsumerTest to show parameters in test lists
> -
>
> Key: KAFKA-16642
> URL: https://issues.apache.org/jira/browse/KAFKA-16642
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> {{KafkaConsumerTest}} was recently updated to make many of its tests 
> parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group 
> protocols. However, in some of the tools in which [lists of tests are 
> provided|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.consumer.KafkaConsumerTest=FLAKY],
>  say, for analysis, the group protocol information is not exposed. For 
> example, one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but 
> it's difficult to know at a glance which group protocol is causing the 
> problem because the list simply shows:
> {quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}}
> {quote}
> Ideally, it would expose more information, such as:
> {quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15652:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
> 
>
> Key: KAFKA-15652
> URL: https://issues.apache.org/jira/browse/KAFKA-15652
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, position, unit-tests
> Fix For: 3.9.0
>
>
> In the {{updateFetchPositions()}} method implementation, both 
> {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions 
> asynchronously. [~junrao] stated the following in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]:
> {quote}There is a subtle difference between transitioning to reset from 
> initializing and transitioning to reset from {{OffsetOutOfRangeException}} 
> during fetch. In the latter, the application thread will call 
> {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default 
> offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the 
> application thread during {{{}poll{}}}, which is what we want.
> However, for the former, if there is no default offset reset policy, we 
> simply ignore that partition through 
> {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, 
> the partition will be forever in the reset state and the application thread 
> won't get the {{{}OffsetOutOfRangeException{}}}.
> {quote}
> I intentionally changed the code so that no exceptions were thrown in 
> {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an 
> empty map. When I ran the unit tests and integration tests, there were no 
> failures, strongly suggesting that there is no coverage of this particular 
> edge case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test

2024-06-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16576:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> New consumer fails with assert in consumer_test.py’s test_consumer_failure 
> system test
> --
>
> Key: KAFKA-16576
> URL: https://issues.apache.org/jira/browse/KAFKA-16576
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Minor
>  Labels: flaky-test, kip-848-client-support, system-tests
> Fix For: 3.9.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   42.582 seconds
> AssertionError()
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 399, in test_consumer_failure
> assert partition_owner is not None
> AssertionError
> Notify
> {code}
> Affected tests:
>  * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >