[jira] [Created] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2023-12-19 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16034:
--

 Summary: AsyncKafkaConsumer will get Invalid Request error when 
trying to rejoin on fenced/unknown member Id
 Key: KAFKA-16034
 URL: https://issues.apache.org/jira/browse/KAFKA-16034
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


The consumer will log invalid request error when joining from fenced/unknown 
member id because we didn't reset the HeartbeatState and we won't send the 
needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2023-12-19 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16034:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Bug)

> AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on 
> fenced/unknown member Id
> ---
>
> Key: KAFKA-16034
> URL: https://issues.apache.org/jira/browse/KAFKA-16034
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Major
>
> The consumer will log invalid request error when joining from fenced/unknown 
> member id because we didn't reset the HeartbeatState and we won't send the 
> needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16026) AsyncConsumer does not send a poll event to the background thread

2023-12-19 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798843#comment-17798843
 ] 

Philip Nee commented on KAFKA-16026:


For the PR [15035|https://github.com/apache/kafka/pull/15035] - I only clear 
the assignment and subscriptions.  We still need to add callback invocation 
logic in order to enable more integration tests.

> AsyncConsumer does not send a poll event to the background thread
> -
>
> Key: KAFKA-16026
> URL: https://issues.apache.org/jira/browse/KAFKA-16026
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> consumer poll does not send a poll event to the background thread to:
>  # trigger autocommit
>  # reset max poll interval timer
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics [kafka]

2023-12-19 Thread via GitHub


dajac commented on PR #14848:
URL: https://github.com/apache/kafka/pull/14848#issuecomment-1863996596

   Merged to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics [kafka]

2023-12-19 Thread via GitHub


dajac merged PR #14848:
URL: https://github.com/apache/kafka/pull/14848


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1863986474

   hi @cadonna - Thanks for putting time into the review.  I've addressed your 
comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-19 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1432348819


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -968,4 +971,34 @@ public void testCleanupGroupMetadata() {
 verify(groupMetadataManager, 
times(1)).maybeDeleteGroup(eq("group-id"), any());
 verify(groupMetadataManager, 
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
 }
+
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testReplayTransactionEndMarker(TransactionResult result) {
+GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+new LogContext(),
+groupMetadataManager,
+offsetMetadataManager,
+Time.SYSTEM,
+new MockCoordinatorTimer<>(Time.SYSTEM),
+mock(GroupCoordinatorConfig.class),
+coordinatorMetrics,
+metricsShard
+);
+
+coordinator.replayTransactionEndMarker(
+100L,
+(short) 5,

Review Comment:
   nope. however, i think that it should be part of the coordinator shard 
interface to be consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-19 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1432346828


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -2414,6 +2424,67 @@ public void testReplayWithTombstone() {
 assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
 }
 
+@Test
+public void testReplayTransactionEndMarkerWithCommit() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+// Add pending transactional commit for producer id 5.
+verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new 
OffsetAndMetadata(
+100L,
+OptionalInt.empty(),
+"small",
+context.time.milliseconds(),
+OptionalLong.empty()
+));
+
+// Add pending transactional commit for producer id 6.
+verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new 
OffsetAndMetadata(
+200L,
+OptionalInt.empty(),
+"small",
+context.time.milliseconds(),
+OptionalLong.empty()
+));
+
+// Replaying an end marker with an unknown producer id should not fail.

Review Comment:
   I think that it is possible to actually have a transaction without any 
records. In this case, the pending offset commits would be empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-19 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1432340012


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
 
 new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
   }
+
+  private def logReadResult(
+startOffset: Long,
+producerId: Long,
+producerEpoch: Short,
+controlRecordType: ControlRecordType
+  ): FetchDataInfo = {
+val fileRecords = mock(classOf[FileRecords])
+val memoryRecords = MemoryRecords.withEndTransactionMarker(
+  startOffset,
+  0L,
+  RecordBatch.NO_PARTITION_LEADER_EPOCH,
+  producerId,
+  producerEpoch,
+  new EndTransactionMarker(controlRecordType, 0)
+)
+
+when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+val bufferCapture: ArgumentCaptor[ByteBuffer] = 
ArgumentCaptor.forClass(classOf[ByteBuffer])
+when(fileRecords.readInto(
+  bufferCapture.capture(),
+  ArgumentMatchers.anyInt())
+).thenAnswer { _ =>
+  val buffer = bufferCapture.getValue
+  buffer.put(memoryRecords.buffer.duplicate)
+  buffer.flip()
+}
+
+new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)

Review Comment:
   When you read from the log, you get back FileRecords.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-19 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1432339221


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
 
 new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
   }
+
+  private def logReadResult(
+startOffset: Long,
+producerId: Long,
+producerEpoch: Short,
+controlRecordType: ControlRecordType
+  ): FetchDataInfo = {
+val fileRecords = mock(classOf[FileRecords])
+val memoryRecords = MemoryRecords.withEndTransactionMarker(
+  startOffset,

Review Comment:
   The marker created in 
https://github.com/apache/kafka/pull/14985/files#diff-252d8f54c521cc32e09406f127d568391346d3f92183fdb70f04108f4360f27fR197-R201
 is then passed to the Partition and the Log. When the control record is 
finally written, it will get an offset, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432322309


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -234,7 +235,8 @@ public long maximumTimeToWait(long currentTimeMs) {
  * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
  * member is in the {@link MemberState#UNSUBSCRIBED} state.

Review Comment:
   Is the state here correct? 



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -169,7 +169,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   startingTimestamp = startingTimestamp)
   }
 
-  // TODO: enable this test for the consumer group protocol when KAFKA-16008 
has been fixed.

Review Comment:
   Why did you remove this comment but not enable the test?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -196,7 +195,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(1, listener.callsToRevoked)
   }
 
-  // TODO: enable this test for the consumer group protocol when KAFKA-16009 
has been fixed.

Review Comment:
   Why did you remove this comment but not enable the test?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -660,14 +672,14 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager() {
 subscriptions = mock(SubscriptionState.class);
 membershipManager = mock(MembershipManager.class);
 backgroundEventHandler = mock(BackgroundEventHandler.class);
-heartbeatState = new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+heartbeatState = spy(new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs));

Review Comment:
   No, I don't.  Why not passing mocks to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement RemoteLogSizeBytes [kafka]

2023-12-19 Thread via GitHub


showuon commented on code in PR #15050:
URL: https://github.com/apache/kafka/pull/15050#discussion_r1432303423


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1585,6 +1585,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   def onlyLocalLogSegmentsSize: Long =
 UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= 
highestOffsetInRemoteStorage).collect(Collectors.toList[LogSegment]))
 
+  /**
+   * The log size in bytes for all segments that are only in remote log.
+   */
+  def onlyRemoteLogSegmentsSize: Long =
+UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset < 
highestOffsetInRemoteStorage).collect(Collectors.toList[LogSegment]))

Review Comment:
   We can calculate the size of remote-only-log-segments from remote log 
metadata (i.e. `RemoteLogSegmentMetadata#segmentSizeInBytes`). So I think we 
can record the size at the same place I did for remoteLogMetadataCount 
[here](https://github.com/apache/kafka/pull/15026/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R1005).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16014: add RemoteLogMetadataCount metric [kafka]

2023-12-19 Thread via GitHub


kamalcph commented on code in PR #15026:
URL: https://github.com/apache/kafka/pull/15026#discussion_r1432264764


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -289,14 +289,14 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
 
 def gauge(): Gauge[Long] = gaugeLock synchronized {
   if (gaugeObject == null) {
-gaugeObject = metricsGroup.newGauge(metricType, () => 
brokerTopicAggregatedMetric.value())
+gaugeObject = metricsGroup.newGauge(metricType, () => 
brokerTopicAggregatedMetric.value(), tags)

Review Comment:
   Nice catch, `tags` was missing for other metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement RemoteLogSizeBytes [kafka]

2023-12-19 Thread via GitHub


kamalcph commented on code in PR #15050:
URL: https://github.com/apache/kafka/pull/15050#discussion_r1432252882


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1585,6 +1585,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   def onlyLocalLogSegmentsSize: Long =
 UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= 
highestOffsetInRemoteStorage).collect(Collectors.toList[LogSegment]))
 
+  /**
+   * The log size in bytes for all segments that are only in remote log.
+   */
+  def onlyRemoteLogSegmentsSize: Long =
+UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset < 
highestOffsetInRemoteStorage).collect(Collectors.toList[LogSegment]))

Review Comment:
   The `logSegments` points to the local-log segments, we cannot calculate the 
size of remote-only-log-segments from here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-19 Thread via GitHub


kamalcph commented on PR #15005:
URL: https://github.com/apache/kafka/pull/15005#issuecomment-1863858511

   https://github.com/apache/kafka/pull/15005#discussion_r1431092107
   
   We can take this up in the next PR, otherwise the replica might report stale 
metrics. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16014: add RemoteLogMetadataCount metric [kafka]

2023-12-19 Thread via GitHub


showuon commented on code in PR #15026:
URL: https://github.com/apache/kafka/pull/15026#discussion_r1432207715


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -701,16 +702,22 @@ void 
testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
 CountDownLatch copyLogSegmentLatch = new CountDownLatch(1);
 doAnswer(ans -> {
 // waiting for verification
-copyLogSegmentLatch.await();
+copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS);

Review Comment:
   Put await timeout to avoid to wait forever. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16014: add RemoteLogMetadataCount metric [kafka]

2023-12-19 Thread via GitHub


showuon commented on code in PR #15026:
URL: https://github.com/apache/kafka/pull/15026#discussion_r1432205258


##
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##
@@ -321,4 +323,24 @@ class KafkaRequestHandlerTest {
 assertEquals(0, brokerTopicMetrics.remoteCopyBytesLag)
   }
 
+  @Test
+  def testRemoteLogMetadataCount(): Unit = {
+val brokerTopicMetrics = setupBrokerTopicMetrics()
+
+assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount)
+brokerTopicMetrics.recordRemoteLogMetadataCount(0, 1)
+assertEquals(1, brokerTopicMetrics.remoteLogMetadataCount)
+
+brokerTopicMetrics.recordRemoteLogMetadataCount(1, 2)
+brokerTopicMetrics.recordRemoteLogMetadataCount(2, 3)
+assertEquals(3, brokerTopicMetrics.remoteLogMetadataCount)
+
+
+// verify there will be no minus value for a partition

Review Comment:
   Oh, I forgot to remove it.



##
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##
@@ -321,4 +323,24 @@ class KafkaRequestHandlerTest {
 assertEquals(0, brokerTopicMetrics.remoteCopyBytesLag)
   }
 
+  @Test
+  def testRemoteLogMetadataCount(): Unit = {
+val brokerTopicMetrics = setupBrokerTopicMetrics()
+
+assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount)
+brokerTopicMetrics.recordRemoteLogMetadataCount(0, 1)
+assertEquals(1, brokerTopicMetrics.remoteLogMetadataCount)
+
+brokerTopicMetrics.recordRemoteLogMetadataCount(1, 2)
+brokerTopicMetrics.recordRemoteLogMetadataCount(2, 3)
+assertEquals(3, brokerTopicMetrics.remoteLogMetadataCount)

Review Comment:
   Oops! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432202539


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -234,7 +237,8 @@ public long maximumTimeToWait(long currentTimeMs) {
  * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
  * member is in the {@link MemberState#UNSUBSCRIBED} state.
  */
-public void resetPollTimer() {
+public void resetPollTimer(final long pollMs) {
+pollTimer.update(pollMs);

Review Comment:
   note: pollMs is the time user invoke consumer#poll.  This is better than 
taking time.millisecond() because there could be some discrepancy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432201481


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -195,6 +195,7 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, true);
 // We can ignore the leave response because we can join before or 
after receiving the response.
 heartbeatRequestState.reset();
+heartbeatState.reset();

Review Comment:
   One example would be fenced exception - we need to reset before sending a 
joining heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432201197


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -288,7 +288,7 @@ public CompletableFuture> addOffsetFetchR
 }
 
 public void updateAutoCommitTimer(final long currentTimeMs) {
-this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
+this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs));

Review Comment:
   switching to updateTimer for a more descriptive name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199885


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -660,14 +672,14 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager() {
 subscriptions = mock(SubscriptionState.class);
 membershipManager = mock(MembershipManager.class);
 backgroundEventHandler = mock(BackgroundEventHandler.class);
-heartbeatState = new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+heartbeatState = spy(new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs));

Review Comment:
   the purpose of spying was to make sure a certain method was invoked. I think 
this is an acceptable use case, do you agree?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199373


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -385,6 +386,7 @@ public void testWakeupAfterEmptyFetch() {
 assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
 }
 
+@Test

Review Comment:
   I have no idea why this was removed.  It could be me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199230


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -136,7 +135,7 @@ private void process(final PollApplicationEvent event) {
 }
 
 requestManagers.commitRequestManager.ifPresent(m -> 
m.updateAutoCommitTimer(event.pollTimeMs()));
-
requestManagers.heartbeatRequestManager.ifPresent(HeartbeatRequestManager::resetPollTimer);

Review Comment:
   turned out this is a bug - the timer's current time was not updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199044


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() {
 @Override
 public void transitionToStaled() {
 memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-currentAssignment.clear();

Review Comment:
   See the TODO on line 676.  For now, I think we can directly clear the 
currentAssignment and remove owned partitions.  In the future, we will need to 
wait for the callback to complete before proceed with joining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432198577


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
+// If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+// sending another leave group.

Review Comment:
   the change was removed - i think we don't need to make a special case for 
reconciliation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) [kafka]

2023-12-19 Thread via GitHub


ijuma commented on code in PR #15032:
URL: https://github.com/apache/kafka/pull/15032#discussion_r1432179355


##
generator/src/main/java/org/apache/kafka/message/StructRegistry.java:
##
@@ -110,6 +110,7 @@ private void addStructSpecs(Versions parentVersions, 
List fields) {
 // Synthesize a StructSpec object out of the fields.
 StructSpec spec = new StructSpec(typeName,
 field.versions().toString(),
+null, // version deprecations not supported at field 
level //FIXME Perhaps we should pass `Versions.NONE.toString` instead

Review Comment:
   Yes! My bad for not fixing this before I asked for the review. I attempted a 
fix, let's see if any tests fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) [kafka]

2023-12-19 Thread via GitHub


ijuma commented on code in PR #15032:
URL: https://github.com/apache/kafka/pull/15032#discussion_r1432178897


##
core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala:
##
@@ -118,6 +119,21 @@ class RequestConvertToJsonTest {
 assertEquals(expectedNode, actualNode)
   }
 
+  @Test
+  def testRequestHeaderNodeWithDeprecatedApiVersion(): Unit = {
+val alterIsrRequest = FetchRequest.Builder.forConsumer(0, 0, 0, 
Collections.emptyMap()).build(0);

Review Comment:
   Fixed, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) [kafka]

2023-12-19 Thread via GitHub


hachikuji commented on code in PR #15032:
URL: https://github.com/apache/kafka/pull/15032#discussion_r1432175567


##
generator/src/main/java/org/apache/kafka/message/StructRegistry.java:
##
@@ -110,6 +110,7 @@ private void addStructSpecs(Versions parentVersions, 
List fields) {
 // Synthesize a StructSpec object out of the fields.
 StructSpec spec = new StructSpec(typeName,
 field.versions().toString(),
+null, // version deprecations not supported at field 
level //FIXME Perhaps we should pass `Versions.NONE.toString` instead

Review Comment:
   Do you intend to address the FIXME in this patch?



##
core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala:
##
@@ -118,6 +119,21 @@ class RequestConvertToJsonTest {
 assertEquals(expectedNode, actualNode)
   }
 
+  @Test
+  def testRequestHeaderNodeWithDeprecatedApiVersion(): Unit = {
+val alterIsrRequest = FetchRequest.Builder.forConsumer(0, 0, 0, 
Collections.emptyMap()).build(0);

Review Comment:
   nit: `fetchRequest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15366: Modify LogDirFailureTest for KRaft [kafka]

2023-12-19 Thread via GitHub


rondagostino merged PR #14977:
URL: https://github.com/apache/kafka/pull/14977


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15366: Modify LogDirFailureTest for KRaft [kafka]

2023-12-19 Thread via GitHub


rondagostino commented on PR #14977:
URL: https://github.com/apache/kafka/pull/14977#issuecomment-1863734568

   Test failures are unrelated/flakes.  JDK 8 tests were clean.  All tests in 
this PR passed.  Merging to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Client state machine fix for transition to stable on initial empty assignment [kafka]

2023-12-19 Thread via GitHub


lianetm commented on PR #15033:
URL: https://github.com/apache/kafka/pull/15033#issuecomment-1863701453

   Test failures seem unrelated. I checked around `testCoordinatorFailover` 
that, even though it passes locally, it failed on the CI for the new consumer, 
but it's been failing for other recent PRs and reported flaky with 
[KAFKA-16024](https://issues.apache.org/jira/browse/KAFKA-16024).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Test refactor consumer close [kafka]

2023-12-19 Thread via GitHub


philipnee closed pull request #15036: Test refactor consumer close
URL: https://github.com/apache/kafka/pull/15036


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15592: Allow member to not sending a heartbeat when a group ID is configured [kafka]

2023-12-19 Thread via GitHub


philipnee closed pull request #14390: KAFKA-15592: Allow member to not sending 
a heartbeat when a group ID is configured
URL: https://github.com/apache/kafka/pull/14390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling

2023-12-19 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798772#comment-17798772
 ] 

Philip Nee commented on KAFKA-16010:


I thought KAFKA-16026 (fixing the max poll) would fix this; however, I'm 
getting quite a strange error. See below: Seems like some cases we need to 
reset the HeartbeatState: due to RebalanceTimeoutMs must be provided in first 
request.

 
{code:java}
[2023-12-19 15:07:37,732] WARN [QuorumController id=1000] Performing controller 
activation. The metadata log appears to be empty. Appending 1 bootstrap 
record(s) in metadata transaction at metadata.version 3.8-IV0 from bootstrap 
source 'test harness'. Setting the ZK migration state to NONE since this is a 
de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController:108)
[2023-12-19 15:07:39,835] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,835] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,835] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,835] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,835] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,835] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl:487)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] GroupHeartbeatRequest failed due to error: INVALID_REQUEST 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:380)
[2023-12-19 15:07:39,836] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=my-test] Member T9t83LZLS9-GxOGCX1sI9A with epoch 0 transitioned to 
FATAL state 

[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2023-12-19 Thread Sabit (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798761#comment-17798761
 ] 

Sabit edited comment on KAFKA-16025 at 12/19/23 10:23 PM:
--

I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]. This is because 
`tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes 
empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock 
an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed 
to be called before the next invocation of 
`tryToLockAllNonEmptyTaskDirectories`, so on the next call to 
`tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:
 - CleanupThread starts on `i-0f1a5e7a42158e04b`

{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 60ms).}}
 - New member joins the consumer group

{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.}}
 - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread 
is deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well

{{2023-12-13 22:57:17.98Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)}}
 - Now, if when this state is reached and 
[TaskManager#handleRebalanceComplete|#L180-L199]] is called, the tasks with now 
empty directories will get unlocked as expected. We would see this with the log 
["Adding newly assigned partitions"|#L316-L333]] as a result of the sync group 
request succeeding following a successful join request. However, the sync group 
request fails for this thread because another rebalance is triggered:

{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance 
group 0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
Successfully joined group with generation Generation{generationId=122096, 
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', 
protocol='stream'{
{{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 

[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2023-12-19 Thread Sabit (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798761#comment-17798761
 ] 

Sabit edited comment on KAFKA-16025 at 12/19/23 10:23 PM:
--

I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because 
`tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes 
empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock 
an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed 
to be called before the next invocation of 
`tryToLockAllNonEmptyTaskDirectories`, so on the next call to 
`tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:
 - CleanupThread starts on `i-0f1a5e7a42158e04b`

{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 60ms).}}
 - New member joins the consumer group

{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.}}
 - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread 
is deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well

{{2023-12-13 22:57:17.98Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)}}
 - Now, if when this state is reached and 
[TaskManager#handleRebalanceComplete|#L180-L199]] is called, the tasks with now 
empty directories will get unlocked as expected. We would see this with the log 
["Adding newly assigned partitions"|#L316-L333]] as a result of the sync group 
request succeeding following a successful join request. However, the sync group 
request fails for this thread because another rebalance is triggered:

{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance 
group 0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
Successfully joined group with generation Generation{generationId=122096, 
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', 
protocol='stream'{
{{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 

[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2023-12-19 Thread Sabit (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798761#comment-17798761
 ] 

Sabit edited comment on KAFKA-16025 at 12/19/23 10:22 PM:
--

I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because 
`tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes 
empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock 
an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed 
to be called before the next invocation of 
`tryToLockAllNonEmptyTaskDirectories`, so on the next call to 
`tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:
 - CleanupThread starts on `i-0f1a5e7a42158e04b`

{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 60ms).}}
 - New member joins the consumer group

{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.}}
 - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread 
is deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well

{{2023-12-13 22:57:17.98Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)}}
 - Now, if when this state is reached and 
[TaskManager#handleRebalanceComplete#handleRebalanceComplete|[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]]
 is called, the tasks with now empty directories will get unlocked as expected. 
We would see this with the log ["Adding newly assigned partitions"|#L316-L333]] 
as a result of the sync group request succeeding following a successful join 
request. However, the sync group request fails for this thread because another 
rebalance is triggered:

{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance 
group 0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
Successfully joined group with generation Generation{generationId=122096, 

[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2023-12-19 Thread Sabit (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798761#comment-17798761
 ] 

Sabit edited comment on KAFKA-16025 at 12/19/23 10:22 PM:
--

I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because 
`tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes 
empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock 
an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed 
to be called before the next invocation of 
`tryToLockAllNonEmptyTaskDirectories`, so on the next call to 
`tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:
 - CleanupThread starts on `i-0f1a5e7a42158e04b`

{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 60ms).}}
 - New member joins the consumer group

{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.}}
 - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread 
is deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well

{{2023-12-13 22:57:17.98Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)}}
 - Now, if when this state is reached and 
[[TaskManager#handleRebalanceComplete||#handleRebalanceComplete] 
[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]
 []|#handleRebalanceComplete] is called, the tasks with now empty directories 
will get unlocked as expected. We would see this with the log ["Adding newly 
assigned 
partitions"|[https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333]]
 as a result of the sync group request succeeding following a successful join 
request. However, the sync group request fails for this thread because another 
rebalance is triggered:

{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance 
group 0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 

[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2023-12-19 Thread Sabit (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798761#comment-17798761
 ] 

Sabit edited comment on KAFKA-16025 at 12/19/23 10:21 PM:
--

I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories|[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1154-L1180]].
 This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the 
directory becomes empty between calls to `listNonEmptyTaskDirectories` and 
`lock`, so it can lock an empty directory. 
`releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before 
the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next 
call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:
 - CleanupThread starts on `i-0f1a5e7a42158e04b`


{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 60ms).}}
 - New member joins the consumer group


{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.}}
 - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread 
is deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well


{{2023-12-13 22:57:17.98Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 60ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)}}
 - Now, if when this state is reached and 
[TaskManager#handleRebalanceComplete|#handleRebalanceComplete]([https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199])
 is called, the tasks with now empty directories will get unlocked as expected. 
We would see this with the log ["Adding newly assigned 
partitions"]([https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333])
 as a result of the sync group request succeeding following a successful join 
request. However, the sync group request fails for this thread because another 
rebalance is triggered:


{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance 
group 0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during 

[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2023-12-19 Thread Sabit (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798761#comment-17798761
 ] 

Sabit commented on KAFKA-16025:
---

I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories](https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1154-L1180).
 This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the 
directory becomes empty between calls to `listNonEmptyTaskDirectories` and 
`lock`, so it can lock an empty directory. 
`releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before 
the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next 
call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:

- CleanupThread starts on `i-0f1a5e7a42158e04b`

```
2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 60ms).
```

- New member joins the consumer group

```
2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.
```

- Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is 
deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well

```
2023-12-13 22:57:17.98Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).
2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing
2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group
2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 60ms).
2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 60ms).
2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 60ms).
2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 60ms).
2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 60ms).
2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)
```

- Now, if when this state is reached and 
[TaskManager#handleRebalanceComplete](https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199)
 is called, the tasks with now empty directories will get unlocked as expected. 
We would see this with the log ["Adding newly assigned 
partitions"](https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333)
 as a result of the sync group request succeeding following a successful join 
request. However, the sync group request fails for this thread because another 
rebalance is triggered:

```
2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 
0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)
2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 

[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-12-19 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798760#comment-17798760
 ] 

Phuc Hong Tran commented on KAFKA-15341:


Got it. Thanks, [~showuon] 

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-19 Thread via GitHub


jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1431849588


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
 
 new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
   }
+
+  private def logReadResult(
+startOffset: Long,
+producerId: Long,
+producerEpoch: Short,
+controlRecordType: ControlRecordType
+  ): FetchDataInfo = {
+val fileRecords = mock(classOf[FileRecords])
+val memoryRecords = MemoryRecords.withEndTransactionMarker(
+  startOffset,

Review Comment:
   it doesn't seem like we use the start/initial offset when we generate the 
control records, i.e. 
https://github.com/apache/kafka/pull/14985/files#diff-252d8f54c521cc32e09406f127d568391346d3f92183fdb70f04108f4360f27fR197-R201
   
   is this because we only use the start offset only during a read? i guess it 
makes sense since we don't know what offset we're appending a record at when 
creating the records



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -2414,6 +2424,67 @@ public void testReplayWithTombstone() {
 assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
 }
 
+@Test
+public void testReplayTransactionEndMarkerWithCommit() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+// Add pending transactional commit for producer id 5.
+verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new 
OffsetAndMetadata(
+100L,
+OptionalInt.empty(),
+"small",
+context.time.milliseconds(),
+OptionalLong.empty()
+));
+
+// Add pending transactional commit for producer id 6.
+verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new 
OffsetAndMetadata(
+200L,
+OptionalInt.empty(),
+"small",
+context.time.milliseconds(),
+OptionalLong.empty()
+));
+
+// Replaying an end marker with an unknown producer id should not fail.

Review Comment:
   is this to provide idempotency? i'm guessing we can have duplicate requests 
coming in



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -1900,4 +1908,59 @@ public void testCommitTransactionalOffsets() throws 
ExecutionException, Interrup
 
 assertEquals(response, future.get());
 }
+
+@Test
+public void testCompleteTransaction() throws ExecutionException, 
InterruptedException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime,
+new GroupCoordinatorMetrics()
+);
+service.startup(() -> 1);
+
+when(runtime.scheduleTransactionCompletion(
+ArgumentMatchers.eq("write-txn-marker"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.eq(100L),
+ArgumentMatchers.eq((short) 5),
+ArgumentMatchers.eq(10),
+ArgumentMatchers.eq(TransactionResult.COMMIT),
+ArgumentMatchers.eq(Duration.ofMillis(100))
+)).thenReturn(CompletableFuture.completedFuture(null));
+
+CompletableFuture future = service.completeTransaction(
+new TopicPartition("__consumer_offsets", 0),
+100L,
+(short) 5,
+10,
+TransactionResult.COMMIT,
+Duration.ofMillis(100)
+);
+
+assertNull(future.get());
+}
+
+@Test
+public void testCompleteTransactionWhenNotStarted() throws 
ExecutionException, InterruptedException {

Review Comment:
   seems we can remove the exceptions



##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##
@@ -234,6 +237,79 @@ class CoordinatorPartitionWriterTest {
 assertEquals(records, receivedRecords)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[ControlRecordType], names = Array("COMMIT", 
"ABORT"))
+  def testWriteTransactionEndMarker(controlRecordType: ControlRecordType): 
Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val time = new MockTime()
+val partitionRecordWriter = new CoordinatorPartitionWriter(
+  replicaManager,
+  new StringKeyValueSerializer(),
+  CompressionType.NONE,
+  time
+)
+
+when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
+  Collections.emptyMap(),
+  new Properties()
+)))
+
+val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =

Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]

2023-12-19 Thread via GitHub


ableegoldman merged PR #14853:
URL: https://github.com/apache/kafka/pull/14853


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]

2023-12-19 Thread via GitHub


ableegoldman commented on PR #14853:
URL: https://github.com/apache/kafka/pull/14853#issuecomment-1863466888

   I honestly don't know...maybe try doing a rebase from trunk and force 
pushing? That can often clear up this sort of thing. Ideally we don't have to 
abandon this PR for a fresh one (just to avoid losing history/review/comments), 
but that can be a last resort I suppose


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15250:
--
Issue Type: Bug  (was: Improvement)

> DefaultBackgroundThread is running tight loop
> -
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of auto-retry

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15946:
--
Issue Type: Bug  (was: Improvement)

> AsyncKafkaConsumer should retry commits on the application thread instead of 
> auto-retry
> ---
>
> Key: KAFKA-15946
> URL: https://issues.apache.org/jira/browse/KAFKA-15946
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The original design was that the network thread always completes the future 
> whether succeeds or fails.  However, in the current patch, I mis-added 
> auto-retry functionality because commitSync wasn't retrying.  What we should 
> be doing is, the commit sync API should catch the RetriableExceptions and 
> resend another commit until timesout.
>  
> {code:java}
> if (error.exception() instanceof RetriableException) {
> log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, 
> error.message());
> handleRetriableError(error, response);
> retry(responseTime);  <--- We probably shouldn't do this.
> return;
> } {code}
>  
> {code:java}
> @Override
> public void commitSync(Map offsets, 
> Duration timeout) {
> acquireAndEnsureOpen();
> long commitStart = time.nanoseconds();
> try
> { CompletableFuture commitFuture = commit(offsets, true); <-- we 
> probably should retry here ConsumerUtils.getResult(commitFuture, 
> time.timer(timeout)); }
> finally
> { wakeupTrigger.clearTask(); 
> kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); 
> release(); }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15872) Investigate autocommit retry logic

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15872:
--
Issue Type: Task  (was: Improvement)

> Investigate autocommit retry logic
> --
>
> Key: KAFKA-15872
> URL: https://issues.apache.org/jira/browse/KAFKA-15872
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> This is purely an investigation ticket.
> Currently, we send an autocommit only if there isn't an inflight one; 
> however, this logic might not be correct because I think we should:
>  # expires the request if it is not completed in time
>  # always send an autocommit on the clock



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15991:
--
Issue Type: Test  (was: Task)

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15775) Implement listTopics() and partitionFor() for the AsyncKafkaConsumer

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15775:
--
Fix Version/s: 3.7.0

> Implement listTopics() and partitionFor() for the AsyncKafkaConsumer
> 
>
> Key: KAFKA-15775
> URL: https://issues.apache.org/jira/browse/KAFKA-15775
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> {code:java}
> @Override
> public List partitionsFor(String topic) {
> return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
> }
> @Override
> public List partitionsFor(String topic, Duration timeout) {
> throw new KafkaException("method not implemented");
> }
> @Override
> public Map> listTopics() {
> return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
> }
> @Override
> public Map> listTopics(Duration timeout) {
> throw new KafkaException("method not implemented");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15548) Send GroupConsumerHeartbeatRequest on Consumer.close()

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15548:
--
Fix Version/s: 3.7.0

> Send GroupConsumerHeartbeatRequest on Consumer.close()
> --
>
> Key: KAFKA-15548
> URL: https://issues.apache.org/jira/browse/KAFKA-15548
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> Upon closing of the {{Consumer}} we need to send the last 
> GroupConsumerHeartbeatRequest with epoch = -1 to leave the group (or -2 if 
> static member). There is a mechanism introduced in PR 
> [14406|https://github.com/apache/kafka/pull/14406] that allows for performing 
> network I/O on shutdown. The new method 
> {{ConsumerNetworkThread.runAtClose()}} will be executed when 
> {{Consumer.close()}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15696) Revoke partitions on Consumer.close()

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15696:
--
Fix Version/s: 3.7.0

> Revoke partitions on Consumer.close()
> -
>
> Key: KAFKA-15696
> URL: https://issues.apache.org/jira/browse/KAFKA-15696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> Upon closing of the {{Consumer}} we need to revoke assignment. This involves 
> stop fetching, committing offsets if auto-commit enabled and invoking the 
> onPartitionsRevoked callback. There is a mechanism introduced in PR 
> [14406|https://github.com/apache/kafka/pull/14406] that allows for performing 
> network I/O on shutdown. The new method 
> {{ConsumerNetworkThread.runAtClose()}} will be executed when 
> {{Consumer.close()}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Parent: (was: KAFKA-14246)
Issue Type: Bug  (was: Sub-task)

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15866:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Refactor OffsetFetchRequestState Error handling to be more consistent with 
> OffsetCommitRequestState
> ---
>
> Key: KAFKA-15866
> URL: https://issues.apache.org/jira/browse/KAFKA-15866
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lan Ding
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The current OffsetFetchRequestState error handling uses nested if-else, which 
> is quite different, stylistically, to the OffsetCommitRequestState using a 
> switch statment.  The latter is a bit more readable so we should refactor the 
> error handling using the same style to improve readability.
>  
> A minor point: Some of the error handling seems inconsistent with the commit. 
> The logic was from the current implementation, so we should also review all 
> the error handling.  For example: somehow the current logic doesn't mark the 
> coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15652:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
> 
>
> Key: KAFKA-15652
> URL: https://issues.apache.org/jira/browse/KAFKA-15652
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> In the {{updateFetchPositions()}} method implementation, both 
> {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions 
> asynchronously. [~junrao] stated the following in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]:
> {quote}There is a subtle difference between transitioning to reset from 
> initializing and transitioning to reset from {{OffsetOutOfRangeException}} 
> during fetch. In the latter, the application thread will call 
> {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default 
> offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the 
> application thread during {{{}poll{}}}, which is what we want.
> However, for the former, if there is no default offset reset policy, we 
> simply ignore that partition through 
> {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, 
> the partition will be forever in the reset state and the application thread 
> won't get the {{{}OffsetOutOfRangeException{}}}.
> {quote}
> I intentionally changed the code so that no exceptions were thrown in 
> {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an 
> empty map. When I ran the unit tests and integration tests, there were no 
> failures, strongly suggesting that there is no coverage of this particular 
> edge case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15634) Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15634:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate FetcherTest's/FetchRequestManager's testQuotaMetrics
> 
>
> Key: KAFKA-15634
> URL: https://issues.apache.org/jira/browse/KAFKA-15634
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> What is the point of the code in the initial {{while}} loop since the receive 
> is delayed and thus there is no {{throttleDelayMs}} received in the client?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15284:
--
Parent: (was: KAFKA-14246)
Issue Type: New Feature  (was: Sub-task)

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15615) Improve handling of fetching during metadata updates

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15615:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 4.0.0
>
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15558:
--
Parent: (was: KAFKA-14246)
Issue Type: Task  (was: Sub-task)

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15553) Review committed offset refresh logic

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15553:
--
Parent: (was: KAFKA-14246)
Issue Type: Task  (was: Sub-task)

> Review committed offset refresh logic
> -
>
> Key: KAFKA-15553
> URL: https://issues.apache.org/jira/browse/KAFKA-15553
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> From the exsiting comment: If there are any partitions which do not have a 
> valid position and are not awaiting reset, then we need to fetch committed 
> offsets.
>  
> In the async consumer: I wonder if it would make sense to refresh the 
> position on the event loop continuously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15515:
--
Component/s: unit tests

> Remove duplicated integration tests for new consumer
> 
>
> Key: KAFKA-15515
> URL: https://issues.apache.org/jira/browse/KAFKA-15515
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> This task involves removing the temporary `PlaintextAsyncConsumer` file 
> containing duplicated integration tests for the new consumer. The copy was 
> generated to catch regressions and validate functionality in the new consumer 
> while in development. It should be deleted when the new consumer is fully 
> implemented and the existing integration tests (`PlaintextConsumerTest`) can 
> be executed for both implementations.
>  
> Context:
>  
> For the current KafkaConsumer, a set of integration tests exist in the file 
> PlaintextConsumerTest. Those tests cannot be executed as such for the new 
> consumer implementation for 2 main reasons
> - the new consumer is being developed as a new PrototypeAsyncConsumer class, 
> in parallel to the existing KafkaConsumer. 
> - the new consumer is under development, so it does not support all the 
> consumer functionality yet. 
>  
> In order to be able to run the subsets of tests that the new consumer 
> supports while the implementation completes, it was decided to :  
>  - to make a copy of the `PlaintextAsyncConsumer` class, named 
> PlaintextAsyncConsumer.
> - leave all the existing integration tests that cover the simple consumer 
> case unchanged, and disable the tests that are not yet supported by the new 
> consumer. Disabled tests will be enabled as the async consumer
> evolves.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15515) Remove duplicated integration tests for new consumer

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15515:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Remove duplicated integration tests for new consumer
> 
>
> Key: KAFKA-15515
> URL: https://issues.apache.org/jira/browse/KAFKA-15515
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> This task involves removing the temporary `PlaintextAsyncConsumer` file 
> containing duplicated integration tests for the new consumer. The copy was 
> generated to catch regressions and validate functionality in the new consumer 
> while in development. It should be deleted when the new consumer is fully 
> implemented and the existing integration tests (`PlaintextConsumerTest`) can 
> be executed for both implementations.
>  
> Context:
>  
> For the current KafkaConsumer, a set of integration tests exist in the file 
> PlaintextConsumerTest. Those tests cannot be executed as such for the new 
> consumer implementation for 2 main reasons
> - the new consumer is being developed as a new PrototypeAsyncConsumer class, 
> in parallel to the existing KafkaConsumer. 
> - the new consumer is under development, so it does not support all the 
> consumer functionality yet. 
>  
> In order to be able to run the subsets of tests that the new consumer 
> supports while the implementation completes, it was decided to :  
>  - to make a copy of the `PlaintextAsyncConsumer` class, named 
> PlaintextAsyncConsumer.
> - leave all the existing integration tests that cover the simple consumer 
> case unchanged, and disable the tests that are not yet supported by the new 
> consumer. Disabled tests will be enabled as the async consumer
> evolves.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15551:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15250:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> DefaultBackgroundThread is running tight loop
> -
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15305:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15556:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15173:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Consumer event queues should be bounded
> ---
>
> Key: KAFKA-15173
> URL: https://issues.apache.org/jira/browse/KAFKA-15173
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
> facilitate message passing between the application thread and the background 
> thread.  The current implementation is boundless, which can potentially cause 
> OOM and other performance-related issues.
> I think the queues need a finite bound, and we need to decide how to handle 
> the situation when the bound is reached.  In particular, I would like to 
> answer these questions:
>  
>  # What should the upper limit be for both queues: Can this be a 
> configurable, memory-based bound? Or just an arbitrary number of events as 
> the bound.
>  # What should happen when the application event queue is filled up?  It 
> seems like we should introduce a new exception type and notify the user that 
> the consumer is full.
>  # What should happen when the background event queue is filled up? This 
> seems less likely to happen, but I imagine it could happen when the user 
> stops polling the consumer, causing the queue to be filled.
>  # Is it necessary to introduce a public configuration for the queue? I think 
> initially we would select an arbitrary constant number and see the community 
> feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15320) Document event queueing patterns

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15320:
--
Parent: (was: KAFKA-14246)
Issue Type: Task  (was: Sub-task)

> Document event queueing patterns
> 
>
> Key: KAFKA-15320
> URL: https://issues.apache.org/jira/browse/KAFKA-15320
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> We need to first document the event enqueuing patterns in the 
> PrototypeAsyncConsumer. As part of this task, determine if it’s 
> necessary/beneficial to _conditionally_ add events and/or coalesce any 
> duplicate events in the queue.
> _Don’t forget to include diagrams for clarity!_
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15867:
--
Parent: (was: KAFKA-14246)
Issue Type: Task  (was: Sub-task)

> Should ConsumerNetworkThread wrap the exception and notify the polling thread?
> --
>
> Key: KAFKA-15867
> URL: https://issues.apache.org/jira/browse/KAFKA-15867
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The ConsumerNetworkThread runs a tight loop infinitely.  However, when 
> encountering an unexpected exception, it logs an error and continues.
>  
> I think this might not be ideal because user can run blind for a long time 
> before discovering there's something wrong with the code; so I believe we 
> should propagate the throwable back to the polling thread. 
>  
> cc [~lucasbru] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15942) Implement ConsumerInterceptor

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15942:
--
Parent: (was: KAFKA-14246)
Issue Type: New Feature  (was: Sub-task)

> Implement ConsumerInterceptor
> -
>
> Key: KAFKA-15942
> URL: https://issues.apache.org/jira/browse/KAFKA-15942
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer
>  
> This is the current code. The implementation would be very similar
> {code:java}
> if (interceptors != null)
> interceptors.onCommit(offsets); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15636:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
> 
>
> Key: KAFKA-15636
> URL: https://issues.apache.org/jira/browse/KAFKA-15636
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> {{expectedBytes}} is calculated as total, instead of avg. Is this correct?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15637:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate FetcherTest's/FetchRequestManager's 
> testFetchCompletedBeforeHandlerAdded
> 
>
> Key: KAFKA-15637
> URL: https://issues.apache.org/jira/browse/KAFKA-15637
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> Thanks for the reply. I still don't quite understand the test. Why do we 
> duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
>  
> {code:java}
> networkClientDelegate.disconnectAsync(readReplica);
> networkClientDelegate.poll(time.timer(0));
> {code}
>  
> MockClient is only woken up through 
> {{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15635) Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15635:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate FetcherTest's/FetchRequestManager's testFetcherLeadMetric
> -
>
> Key: KAFKA-15635
> URL: https://issues.apache.org/jira/browse/KAFKA-15635
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> Why is {{recordsFetchLeadMin}} different from {{partitionLead}} given there 
> is only 1 assigned partition?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15557:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in 
> assignFromUserNoId
> ---
>
> Key: KAFKA-15557
> URL: https://issues.apache.org/jira/browse/KAFKA-15557
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods 
> named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to 
> perform duplicate metadata updates:
> {code:java}
> private void assignFromUser(Set partitions) {
> subscriptions.assignFromUser(partitions);
> client.updateMetadata(initialUpdateResponse);
> // A dummy metadata update to ensure valid leader epoch.
> metadata.updateWithCurrentRequestVersion(
> RequestTestUtils.metadataUpdateWithIds(
> "dummy",
> 1, 
> Collections.emptyMap(),
> singletonMap(topicName, 4),
> tp -> validLeaderEpoch, topicIds
> ),
> false,
> 0L
> );
> }
> {code}
> {{client.updateMetadata()}} eventually calls 
> {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is 
> updating the cluster metadata twice with different values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15617) Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions and testInflightFetchOnPendingPartitions overlap

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15617:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate FetcherTest's/FetchRequestManager's testFetchingPendingPartitions 
> and testInflightFetchOnPendingPartitions overlap
> --
>
> Key: KAFKA-15617
> URL: https://issues.apache.org/jira/browse/KAFKA-15617
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> In FetcherTest, the two tests testFetchingPendingPartitions and 
> testInflightFetchOnPendingPartitions have significant overlap. Perhaps the 
> former subsumes the latter?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15606) Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15606:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Investigate FetcherTest's/FetchRequestManager's testCompletedFetchRemoval
> -
>
> Key: KAFKA-15606
> URL: https://issues.apache.org/jira/browse/KAFKA-15606
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> As part of the review for [FetchRequestManager pull 
> request|https://github.com/apache/kafka/pull/14406], [~junrao] had some 
> questions related to the correctness and clarity of the 
> {{FetcherTest.testCompletedFetchRemoval()}} test:
> Questions:
> * https://github.com/apache/kafka/pull/14406#discussion_r1347908197
> * https://github.com/apache/kafka/pull/14406#discussion_r1347910980
> * https://github.com/apache/kafka/pull/14406#discussion_r1347913781



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of auto-retry

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15946:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> AsyncKafkaConsumer should retry commits on the application thread instead of 
> auto-retry
> ---
>
> Key: KAFKA-15946
> URL: https://issues.apache.org/jira/browse/KAFKA-15946
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The original design was that the network thread always completes the future 
> whether succeeds or fails.  However, in the current patch, I mis-added 
> auto-retry functionality because commitSync wasn't retrying.  What we should 
> be doing is, the commit sync API should catch the RetriableExceptions and 
> resend another commit until timesout.
>  
> {code:java}
> if (error.exception() instanceof RetriableException) {
> log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, 
> error.message());
> handleRetriableError(error, response);
> retry(responseTime);  <--- We probably shouldn't do this.
> return;
> } {code}
>  
> {code:java}
> @Override
> public void commitSync(Map offsets, 
> Duration timeout) {
> acquireAndEnsureOpen();
> long commitStart = time.nanoseconds();
> try
> { CompletableFuture commitFuture = commit(offsets, true); <-- we 
> probably should retry here ConsumerUtils.getResult(commitFuture, 
> time.timer(timeout)); }
> finally
> { wakeupTrigger.clearTask(); 
> kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); 
> release(); }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15993) Enable integration tests that relies on rebalance listener

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15993:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Enable integration tests that relies on rebalance listener
> --
>
> Key: KAFKA-15993
> URL: https://issues.apache.org/jira/browse/KAFKA-15993
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> We will enable integration tests using the async consumer in KAFKA-15971.  
> However, we should also enable tests that rely on rebalance listeners after 
> KAFKA-15628 is closed.  One example would be testMaxPollIntervalMs, that I 
> relies on the listener to verify the correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15948:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Refactor AsyncKafkaConsumer shutdown
> 
>
> Key: KAFKA-15948
> URL: https://issues.apache.org/jira/browse/KAFKA-15948
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Upon closing we need a round trip from the network thread to the application 
> thread and then back to the network thread to complete the callback 
> invocation.  Currently, we don't have any of that.  I think we need to 
> refactor our closing mechanism.  There are a few points to the refactor:
>  # The network thread should know if there's a custom user callback to 
> trigger or not.  If there is, it should wait for the callback completion to 
> send a leave group.  If not, it should proceed with the shutdown.
>  # The application thread sends a closing signal to the network thread and 
> continuously polls the background event handler until time runs out.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16000) Migrate MembershipManagerImplTest away from ConsumerTestBuilder

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16000:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Migrate MembershipManagerImplTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16000
> URL: https://issues.apache.org/jira/browse/KAFKA-16000
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15872) Investigate autocommit retry logic

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15872:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Investigate autocommit retry logic
> --
>
> Key: KAFKA-15872
> URL: https://issues.apache.org/jira/browse/KAFKA-15872
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> This is purely an investigation ticket.
> Currently, we send an autocommit only if there isn't an inflight one; 
> however, this logic might not be correct because I think we should:
>  # expires the request if it is not completed in time
>  # always send an autocommit on the clock



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15999:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
> -
>
> Key: KAFKA-15999
> URL: https://issues.apache.org/jira/browse/KAFKA-15999
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16019:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Some of the tests in PlaintextConsumer can't seem to deterministically 
> invokes and verify the consumer callback
> ---
>
> Key: KAFKA-16019
> URL: https://issues.apache.org/jira/browse/KAFKA-16019
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> I was running the PlaintextConsumer to test the async consumer; however, a 
> few tests were failing with not being able to verify the listener is invoked 
> correctly
> For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`
> Around 50% of the time, the listener's callsToAssigned was never incremented 
> correctly.  Event changing it to awaitUntilTrue it was still the same case
> {code:java}
> consumer.subscribe(List(topic, topic2).asJava, listener)
> val records = awaitNonEmptyRecords(consumer, tp)
> assertEquals(1, listener.callsToAssigned, "should be assigned once") {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16022) AsyncKafkaConsumer sometimes complains "No current assignment for partition {}"

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16022:
--
Parent: (was: KAFKA-14246)
Issue Type: Bug  (was: Sub-task)

> AsyncKafkaConsumer sometimes complains "No current assignment for partition 
> {}"
> ---
>
> Key: KAFKA-16022
> URL: https://issues.apache.org/jira/browse/KAFKA-16022
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> This seems to be a timing issue that before the member receives any 
> assignment from the coordinator, the fetcher will try to find the current 
> position causing "No current assignment for partition {}".  This creates a 
> small amount of noise to the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16001:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16001
> URL: https://issues.apache.org/jira/browse/KAFKA-16001
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16023:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> PlaintextConsumerTest needs to wait for reconciliation to complete before 
> proceeding
> 
>
> Key: KAFKA-16023
> URL: https://issues.apache.org/jira/browse/KAFKA-16023
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Several tests in PlaintextConsumerTest.scala (such as 
> testPerPartitionLagMetricsCleanUpWithSubscribe) uses:
> assertEquals(1, listener.callsToAssigned, "should be assigned once")
> However, as the timing for reconciliation completion is not deterministic due 
> to asynchronous processing. We actually need to wait until the condition to 
> happen.
> However, another issue is the timeout - some of these tasks might not 
> complete within the 600ms timeout, so the tests are deemed to be flaky.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16024) SaslPlaintextConsumerTest#testCoordinatorFailover is flaky

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16024:
--
Parent: (was: KAFKA-14246)
Issue Type: Test  (was: Sub-task)

> SaslPlaintextConsumerTest#testCoordinatorFailover is flaky
> --
>
> Key: KAFKA-16024
> URL: https://issues.apache.org/jira/browse/KAFKA-16024
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test
> Fix For: 3.8.0
>
>
> The test is flaky with the async consumer as we are observing
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Failed to observe commit callback before 
> timeout{code}
> I was not able to replicate this on my local machine easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15942) Implement ConsumerInterceptor

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15942:
--
Fix Version/s: 3.8.0

> Implement ConsumerInterceptor
> -
>
> Key: KAFKA-15942
> URL: https://issues.apache.org/jira/browse/KAFKA-15942
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer
>  
> This is the current code. The implementation would be very similar
> {code:java}
> if (interceptors != null)
> interceptors.onCommit(offsets); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16009:
--
Fix Version/s: 3.8.0

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15282:
--
Fix Version/s: 3.8.0

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15993) Enable integration tests that relies on rebalance listener

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15993:
--
Fix Version/s: 3.8.0

> Enable integration tests that relies on rebalance listener
> --
>
> Key: KAFKA-15993
> URL: https://issues.apache.org/jira/browse/KAFKA-15993
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> We will enable integration tests using the async consumer in KAFKA-15971.  
> However, we should also enable tests that rely on rebalance listeners after 
> KAFKA-15628 is closed.  One example would be testMaxPollIntervalMs, that I 
> relies on the listener to verify the correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15283) Add support for topic ID-related Consumer changes

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15283:
--
Fix Version/s: 3.8.0

> Add support for topic ID-related Consumer changes
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Fix Version/s: 3.8.0

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15538:
--
Fix Version/s: 3.8.0

> Client support for java regex based subscription 
> -
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 client-side assigner RPCs

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15279:
--
Fix Version/s: 3.8.0

> Implement client support for KIP-848 client-side assigner RPCs
> --
>
> Key: KAFKA-15279
> URL: https://issues.apache.org/jira/browse/KAFKA-15279
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to 
> implement the ConsumerGroupAssignmentRequestManager to handle the second and 
> third RPCs on the above list.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15692) New integration tests to ensure full coverage

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15692:
--
Fix Version/s: 3.8.0

> New integration tests to ensure full coverage
> -
>
> Key: KAFKA-15692
> URL: https://issues.apache.org/jira/browse/KAFKA-15692
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> These are to fix bugs discovered during PR reviews but not tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16023:
--
Fix Version/s: 3.8.0

> PlaintextConsumerTest needs to wait for reconciliation to complete before 
> proceeding
> 
>
> Key: KAFKA-16023
> URL: https://issues.apache.org/jira/browse/KAFKA-16023
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Several tests in PlaintextConsumerTest.scala (such as 
> testPerPartitionLagMetricsCleanUpWithSubscribe) uses:
> assertEquals(1, listener.callsToAssigned, "should be assigned once")
> However, as the timing for reconciliation completion is not deterministic due 
> to asynchronous processing. We actually need to wait until the condition to 
> happen.
> However, another issue is the timeout - some of these tasks might not 
> complete within the 600ms timeout, so the tests are deemed to be flaky.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Fix Version/s: 3.8.0

> Client support for new SubscriptionPattern based subscription 
> --
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15542) Release member assignments on errors

2023-12-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15542:
--
Fix Version/s: 3.8.0

> Release member assignments on errors
> 
>
> Key: KAFKA-15542
> URL: https://issues.apache.org/jira/browse/KAFKA-15542
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> Member should release assignment by triggering the onPartitionsLost flow from 
> the HB manager when errors occur (both fencing and unrecoverable errors)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >