Re: [PR] KAFKA-16013: Throw an exception in DelayedRemoteFetch for follower fetch replicas. [kafka]

2023-12-20 Thread via GitHub


showuon merged PR #15015:
URL: https://github.com/apache/kafka/pull/15015


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16013: Throw an exception in DelayedRemoteFetch for follower fetch replicas. [kafka]

2023-12-20 Thread via GitHub


showuon commented on PR #15015:
URL: https://github.com/apache/kafka/pull/15015#issuecomment-1865792143

   It's been running 7 hours, but the result of jdk 21 is still not out. 
However, the other 3 tasks are completed and the failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16014) Implement RemoteLogSizeComputationTime, RemoteLogSizeBytes, RemoteLogMetadataCount

2023-12-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-16014:
-

Assignee: Luke Chen

> Implement RemoteLogSizeComputationTime, RemoteLogSizeBytes, 
> RemoteLogMetadataCount
> --
>
> Key: KAFKA-16014
> URL: https://issues.apache.org/jira/browse/KAFKA-16014
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-20 Thread via GitHub


showuon merged PR #15005:
URL: https://github.com/apache/kafka/pull/15005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16002: Implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments [kafka]

2023-12-20 Thread via GitHub


showuon commented on PR #15005:
URL: https://github.com/apache/kafka/pull/15005#issuecomment-1865598500

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [KAFKA-16015] Fix custom timeouts overwritten by defaults [kafka]

2023-12-20 Thread via GitHub


sciclon2 commented on PR #15030:
URL: https://github.com/apache/kafka/pull/15030#issuecomment-1865512396

   @pprovenzano , I dont have permissions to merge the PR, so I guess somebody 
else eventually will do it, is that correct? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-15739) KRaft support in ResetConsumerGroupOffsetTest

2023-12-20 Thread Zihao Lin (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15739 ]


Zihao Lin deleted comment on KAFKA-15739:
---

was (Author: JIRAUSER300254):
Hi [~sameert] 

I want to test my PR, but how to re-trigger the CI-builds. Any suggestions? 
Thanks

> KRaft support in ResetConsumerGroupOffsetTest
> -
>
> Key: KAFKA-15739
> URL: https://issues.apache.org/jira/browse/KAFKA-15739
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Zihao Lin
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in ResetConsumerGroupOffsetTest in 
> core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala need 
> to be updated to support KRaft
> 75 : def testResetOffsetsNotExistingGroup(): Unit = {
> 89 : def testResetOffsetsExistingTopic(): Unit = {
> 99 : def testResetOffsetsExistingTopicSelectedGroups(): Unit = {
> 116 : def testResetOffsetsExistingTopicAllGroups(): Unit = {
> 130 : def testResetOffsetsAllTopicsAllGroups(): Unit = {
> 149 : def testResetOffsetsToLocalDateTime(): Unit = {
> 165 : def testResetOffsetsToZonedDateTime(): Unit = {
> 181 : def testResetOffsetsByDuration(): Unit = {
> 188 : def testResetOffsetsByDurationToEarliest(): Unit = {
> 195 : def testResetOffsetsByDurationFallbackToLatestWhenNoRecords(): Unit = {
> 205 : def testResetOffsetsToEarliest(): Unit = {
> 212 : def testResetOffsetsToLatest(): Unit = {
> 220 : def testResetOffsetsToCurrentOffset(): Unit = {
> 228 : def testResetOffsetsToSpecificOffset(): Unit = {
> 235 : def testResetOffsetsShiftPlus(): Unit = {
> 243 : def testResetOffsetsShiftMinus(): Unit = {
> 251 : def testResetOffsetsShiftByLowerThanEarliest(): Unit = {
> 259 : def testResetOffsetsShiftByHigherThanLatest(): Unit = {
> 267 : def testResetOffsetsToEarliestOnOneTopic(): Unit = {
> 274 : def testResetOffsetsToEarliestOnOneTopicAndPartition(): Unit = {
> 293 : def testResetOffsetsToEarliestOnTopics(): Unit = {
> 318 : def testResetOffsetsToEarliestOnTopicsAndPartitions(): Unit = {
> 348 : def testResetOffsetsExportImportPlanSingleGroupArg(): Unit = {
> 378 : def testResetOffsetsExportImportPlan(): Unit = {
> 425 : def testResetWithUnrecognizedNewConsumerOption(): Unit = {
> Scanned 515 lines. Found 0 KRaft tests out of 25 tests
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16039) RecordHeaders supports the addAll method

2023-12-20 Thread Jianbin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jianbin Chen updated KAFKA-16039:
-
External issue URL: https://github.com/apache/kafka/pull/15034

> RecordHeaders supports the addAll method
> 
>
> Key: KAFKA-16039
> URL: https://issues.apache.org/jira/browse/KAFKA-16039
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jianbin Chen
>Priority: Minor
>
> Why not provide an addAll method in RecordHeaders? This will help reduce the 
> amount of code required to copy between headers



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16039) RecordHeaders supports the addAll method

2023-12-20 Thread Jianbin Chen (Jira)
Jianbin Chen created KAFKA-16039:


 Summary: RecordHeaders supports the addAll method
 Key: KAFKA-16039
 URL: https://issues.apache.org/jira/browse/KAFKA-16039
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Jianbin Chen


Why not provide an addAll method in RecordHeaders? This will help reduce the 
amount of code required to copy between headers



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1865440196

   hmm @cadonna - seems like there are some build stability issue.  my other PR 
also failed after 8hr (it is completely unrelated to the consumer refactor) 
`PR-15023` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


philipnee commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1865437639

   hi @jolshan - it seems like `OffsetsApiIntegrationTests` is connect related 
test, in `org.apache.kafka.connect.integration`.  in the other PR, some of the 
tests in the integration test module also fail sometimes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433436213


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -266,6 +267,28 @@ CompletableFuture deleteOffsets(
 BufferSupplier bufferSupplier
 );
 
+/**
+ * Complete a transaction. This is called when the WriteTxnMarkers API is 
called

Review Comment:
   Is it worth specifying here that we call this for transactional offset 
commits on consumer offsets topics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433434733


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2432,7 +2477,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = controlRecords,
   requestLocal = requestLocal,
-  responseCallback = maybeSendResponseCallback(producerId, 
marker.transactionResult))
+  responseCallback = errors => {

Review Comment:
   Does an existing test cover this change to the error handling for the 
existing code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16013: Throw an exception in DelayedRemoteFetch for follower fetch replicas. [kafka]

2023-12-20 Thread via GitHub


showuon commented on PR #15015:
URL: https://github.com/apache/kafka/pull/15015#issuecomment-1865421593

   I'd like to merge https://github.com/apache/kafka/pull/15005 first because 
that one is more important than this PR. It should just 1 more hour to get the 
results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433375910


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3018,6 +3020,223 @@ class KafkaApisTest {
   any())
   }
 
+  @Test

Review Comment:
   Are we relying on the existing tests for the old group coordinator and the 
kafka apis changes there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433369682


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -102,6 +241,31 @@ public long append(
 }
 }
 
+@Override
+public long appendEndTransactionMarker(
+TopicPartition tp,
+long producerId,
+short producerEpoch,
+int coordinatorEpoch,
+TransactionResult result
+) throws KafkaException {
+PartitionState state = partitionState(tp);
+state.lock.lock();
+try {
+state.entries.add(new LogControl(
+producerId,
+producerEpoch,
+coordinatorEpoch,
+result
+));
+state.endOffset += 1;
+if (autoCommit) commit(tp, state.endOffset);

Review Comment:
   what does autoCommit mean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433368003


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -33,10 +37,141 @@
  */
 public class InMemoryPartitionWriter implements PartitionWriter {
 
+public static class LogEntry {
+public static  LogEntry value(T value) {
+return new LogValue<>(value);
+}
+
+public static  LogEntry value(
+long producerId,
+short producerEpoch,
+T value
+) {
+return new LogValue<>(
+producerId,
+producerEpoch,
+value
+);
+}
+
+public static LogEntry control(
+long producerId,
+short producerEpoch,
+int coordinatorEpoch,
+TransactionResult result
+) {
+return new LogControl(
+producerId,
+producerEpoch,
+coordinatorEpoch,
+result
+);
+}
+}
+
+public static class LogValue extends LogEntry {
+public final long producerId;
+public final short producerEpoch;
+public final T value;
+
+private LogValue(
+long producerId,
+short producerEpoch,
+T value
+) {
+this.producerId = producerId;
+this.producerEpoch = producerEpoch;
+this.value = value;
+}
+
+private LogValue(T value) {
+this(
+RecordBatch.NO_PRODUCER_ID,
+RecordBatch.NO_PRODUCER_EPOCH,
+value
+);
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;

Review Comment:
   Does this simply check that this object is of LogValue type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433343992


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+// Commit completion #1.
+

Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433342495


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+// Commit completion #1.
+

Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r148933


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));

Review Comment:
   If write1 is done, does that mean 

Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433329835


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -2414,6 +2424,67 @@ public void testReplayWithTombstone() {
 assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
 }
 
+@Test
+public void testReplayTransactionEndMarkerWithCommit() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+// Add pending transactional commit for producer id 5.
+verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new 
OffsetAndMetadata(
+100L,
+OptionalInt.empty(),
+"small",
+context.time.milliseconds(),
+OptionalLong.empty()
+));
+
+// Add pending transactional commit for producer id 6.
+verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new 
OffsetAndMetadata(
+200L,
+OptionalInt.empty(),
+"small",
+context.time.milliseconds(),
+OptionalLong.empty()
+));
+
+// Replaying an end marker with an unknown producer id should not fail.
+context.replayEndTransactionMarker(1L, TransactionResult.COMMIT);
+
+// Replaying an end marker to commit transaction of producer id 5.
+context.replayEndTransactionMarker(5L, TransactionResult.COMMIT);
+
+// The pending offset is removed...
+assertNull(context.offsetMetadataManager.pendingTransactionalOffset(
+5L,
+"foo",
+"bar",
+0
+));
+
+// ... and added to the main offset storage.
+assertEquals(new OffsetAndMetadata(
+100L,
+OptionalInt.empty(),
+"small",
+context.time.milliseconds(),
+OptionalLong.empty()
+), context.offsetMetadataManager.offset(
+"foo",
+"bar",
+0
+));
+
+// Replaying an end marker to abort transaction of producer id 6.
+context.replayEndTransactionMarker(6L, TransactionResult.ABORT);
+
+// The pending offset is removed.
+assertNull(context.offsetMetadataManager.pendingTransactionalOffset(

Review Comment:
   should we check that we didn't add to main offset storage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433324984


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -1900,4 +1908,59 @@ public void testCommitTransactionalOffsets() throws 
ExecutionException, Interrup
 
 assertEquals(response, future.get());
 }
+
+@Test
+public void testCompleteTransaction() throws ExecutionException, 
InterruptedException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime,
+new GroupCoordinatorMetrics()
+);
+service.startup(() -> 1);
+
+when(runtime.scheduleTransactionCompletion(
+ArgumentMatchers.eq("write-txn-marker"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.eq(100L),
+ArgumentMatchers.eq((short) 5),
+ArgumentMatchers.eq(10),
+ArgumentMatchers.eq(TransactionResult.COMMIT),
+ArgumentMatchers.eq(Duration.ofMillis(100))
+)).thenReturn(CompletableFuture.completedFuture(null));
+
+CompletableFuture future = service.completeTransaction(
+new TopicPartition("__consumer_offsets", 0),
+100L,
+(short) 5,
+10,
+TransactionResult.COMMIT,
+Duration.ofMillis(100)
+);
+
+assertNull(future.get());
+}
+
+@Test
+public void testCompleteTransactionWhenNotStarted() {

Review Comment:
   nit: should we clarify this test is for when the coordinator is not started 
(and not the transaction)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on PR #14985:
URL: https://github.com/apache/kafka/pull/14985#issuecomment-1865318460

   I might start a run of the transactions system test off of this branch
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership

2023-12-20 Thread Alexander Aghili (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799209#comment-17799209
 ] 

Alexander Aghili commented on KAFKA-16027:
--

[https://github.com/apache/kafka/pull/15055]
Made a pull request for refactoring.
Didn't move it to ConsumerMetadata.java due to some additional complications. 
Discussions can take place on the PR.

> Refactor MetadataTest#testUpdatePartitionLeadership
> ---
>
> Key: KAFKA-16027
> URL: https://issues.apache.org/jira/browse/KAFKA-16027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Philip Nee
>Priority: Minor
>  Labels: newbie
>
> MetadataTest#testUpdatePartitionLeadership is extremely long.  I think it is 
> pretty close to the 160 line method limit - I tried to modfity it but it 
> would hit the limit when i tried to break things into separated lines.
> The test also contains two tests, so it is best to split it into two separate 
> tests.
> We should also move this to ConsumerMetadata.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Kafka-16027 Refactor MetadataTest#testUpdatePartitionLeadership [kafka]

2023-12-20 Thread via GitHub


Alexander-Aghili opened a new pull request, #15055:
URL: https://github.com/apache/kafka/pull/15055

   Split MetadataTest#testUpdateParitionLeadership into two tests:
   MetadataTest#testIgnoreUpdatePartitionLeadership
   MetadataTest#testApplyAndIgnoreUpdatesPartitionLeadership
   
   Each test < 100 lines, helper functions combined to about 50 lines. 
   Not sure if this is sufficient or if more refactoring is necessary.
   Additional complications for putting this into ConsumerMetadataTest so if 
that is preferred some issues have to be overcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16038) Periodic Logging of Current Assignment

2023-12-20 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-16038:

Summary: Periodic Logging of Current Assignment  (was: Periodic Logging fo 
Current Assignment)

> Periodic Logging of Current Assignment
> --
>
> Key: KAFKA-16038
> URL: https://issues.apache.org/jira/browse/KAFKA-16038
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Major
>
> Currently, assignment is only logged when a re-balance happens:
> {code:java}
> 15:14:56.263 
> [metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] INFO  
> org.apache.kafka.streams.processor.internals.TaskManager {} - stream-thread 
> [metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] 
> Handle new assignment with:
>     New active tasks: [0_7, 0_6, 0_5, 0_4, 0_3, 0_2, 0_1, 0_0]
>     New standby tasks: []
>     Existing active tasks: []
>     Existing standby tasks: [] {code}
> It would be helpful to periodically log the current assignment at a 
> configurable interval in an manner that's machine-friendly. This can help in 
> situations when a rebalance hasn't happened in a long while.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433079259


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -135,7 +136,22 @@ class CoordinatorLoaderImpl[T](
 
 memoryRecords.batches.forEach { batch =>
   if (batch.isControlBatch) {
-throw new IllegalStateException("Control batches are not 
supported yet.")
+batch.asScala.foreach { record =>
+  val controlRecord = ControlRecordType.parse(record.key)
+  if (controlRecord == ControlRecordType.COMMIT) {
+coordinator.replayTransactionEndMarker(

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16038) Periodic Logging fo Current Assignment

2023-12-20 Thread Almog Gavra (Jira)
Almog Gavra created KAFKA-16038:
---

 Summary: Periodic Logging fo Current Assignment
 Key: KAFKA-16038
 URL: https://issues.apache.org/jira/browse/KAFKA-16038
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Almog Gavra
Assignee: Almog Gavra


Currently, assignment is only logged when a re-balance happens:
{code:java}
15:14:56.263 
[metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.TaskManager {} - stream-thread 
[metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] Handle 
new assignment with:
    New active tasks: [0_7, 0_6, 0_5, 0_4, 0_3, 0_2, 0_1, 0_0]
    New standby tasks: []
    Existing active tasks: []
    Existing standby tasks: [] {code}
It would be helpful to periodically log the current assignment at a 
configurable interval in an manner that's machine-friendly. This can help in 
situations when a rebalance hasn't happened in a long while.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16034:
--
Fix Version/s: 3.7.0

> AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on 
> fenced/unknown member Id
> ---
>
> Key: KAFKA-16034
> URL: https://issues.apache.org/jira/browse/KAFKA-16034
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Major
> Fix For: 3.7.0
>
>
> The consumer will log invalid request error when joining from fenced/unknown 
> member id because we didn't reset the HeartbeatState and we won't send the 
> needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15692) New integration tests to ensure full coverage

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15692:
--
Parent: (was: KAFKA-14048)
Issue Type: Test  (was: Sub-task)

> New integration tests to ensure full coverage
> -
>
> Key: KAFKA-15692
> URL: https://issues.apache.org/jira/browse/KAFKA-15692
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> These are to fix bugs discovered during PR reviews but not tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15325:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Integrate topicId in OffsetFetch and OffsetCommit async consumer calls
> --
>
> Key: KAFKA-15325
> URL: https://issues.apache.org/jira/browse/KAFKA-15325
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit 
> APIs. The consumer calls to those APIs should be updated to include topicIds 
> when available.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15538:
--
Priority: Major  (was: Minor)

> Client support for java regex based subscription 
> -
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15538:
--
Parent: (was: KAFKA-14048)
Issue Type: New Feature  (was: Sub-task)

> Client support for java regex based subscription 
> -
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15282:
--
Parent: (was: KAFKA-14048)
Issue Type: New Feature  (was: Sub-task)

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15835) Group commit/callbacks triggering logic

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15835:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Group commit/callbacks triggering logic
> ---
>
> Key: KAFKA-15835
> URL: https://issues.apache.org/jira/browse/KAFKA-15835
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> The new consumer reconciliation logic triggers a commit request, revocation 
> callback and assignment callbacks sequentially to ensure that they are 
> executed in that order. This means that we could require multiple iterations 
> of the poll loop to complete reconciling an assignment. 
> We could consider triggering them all together, to be executed in the same 
> poll iteration, while still making sure that they are executed in the right 
> order. Note that the sequence sometimes should not block on failures (ex. if 
> commit fails revocation proceeds anyways), and other times it does block (if 
> revocation callbacks fail onPartitionsAssigned is not called).
> As part of this task, review the time boundaries for the commit request 
> issued when the assignment changes. It will be effectively time bounded by 
> the rebalance timeout enforced by the broker, so initial approach is to use 
> the same rebalance timeout as boundary on the client. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15697) Add local assignor and ensure it cannot be used with server side assignor

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15697:
--
Parent: (was: KAFKA-14048)
Issue Type: New Feature  (was: Sub-task)

> Add local assignor and ensure it cannot be used with server side assignor
> -
>
> Key: KAFKA-15697
> URL: https://issues.apache.org/jira/browse/KAFKA-15697
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> When we start supporting local/client-side assignor, we should:
>  # Add the config to ConsumerConfig
>  # Examine where should we implement to logic to ensure it is not used along 
> side with the server side assignor, i.e. you can only specify local or remote 
> assignor, or non.
>  ## If both assignors are specified: Throw illegalArgumentException



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16033) Review retry logic of OffsetFetch and OffsetCommit responses

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16033:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Review retry logic of OffsetFetch and OffsetCommit responses
> 
>
> Key: KAFKA-16033
> URL: https://issues.apache.org/jira/browse/KAFKA-16033
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The retry logic for OffsetFetch and OffsetCommit requests lives in the 
> CommitRequestManager, and applies to requests issued from multiple components 
> (AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for 
> the regular auto-commits, MembershipManager for auto-commits before 
> rebalance, auto-commit before closing consumer). While this approach helps to 
> avoid having the retry logic in each caller, currently the CommitManager has 
> it in different places and it ends up being rather hard to follow.
> This task aims at reviewing the retry logic from a high level perspective 
> (multiple callers, with retry needs that have similarities and differences at 
> the same time). So the review should asses the similarities vs differences, 
> and then consider two options:
> 1. Keep retry logic centralized in the CommitManager, but fixed in a more 
> consistent way, applied the same way for all requests, depending on the 
> intention expressed by the caller. Advantages of this approach (current 
> approach + improvement) is that callers that require the same retry logic 
> could reuse if, keeping it in a single place (ex. commitSync from the 
> consumer retries in the same way as the auto-commit before rebalance). 
> 2. move retry logic to the caller. This aligns with the way it was done on 
> the legacy coordinator, but the main challenge seems to be not duplicating 
> the retry logic in callers that require the same.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15542) Release member assignments on errors

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15542:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Release member assignments on errors
> 
>
> Key: KAFKA-15542
> URL: https://issues.apache.org/jira/browse/KAFKA-15542
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> Member should release assignment by triggering the onPartitionsLost flow from 
> the HB manager when errors occur (both fencing and unrecoverable errors)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Parent: (was: KAFKA-14048)
Issue Type: New Feature  (was: Sub-task)

> Client support for new SubscriptionPattern based subscription 
> --
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Labels: consumer-threading-refactor kip-848 kip-848-client-support  (was: 
kip-848 kip-848-client-support)

> Client support for new SubscriptionPattern based subscription 
> --
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Priority: Major  (was: Minor)

> Client support for new SubscriptionPattern based subscription 
> --
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 client-side assigner RPCs

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15279:
--
Parent: (was: KAFKA-14048)
Issue Type: New Feature  (was: Sub-task)

> Implement client support for KIP-848 client-side assigner RPCs
> --
>
> Key: KAFKA-15279
> URL: https://issues.apache.org/jira/browse/KAFKA-15279
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to 
> implement the ConsumerGroupAssignmentRequestManager to handle the second and 
> third RPCs on the above list.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15283) Add support for topic ID-related Consumer changes

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15283:
--
Parent: (was: KAFKA-14048)
Issue Type: New Feature  (was: Sub-task)

> Add support for topic ID-related Consumer changes
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15588:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Purge the unsent offset commits/fetches when the member is fenced/failed
> 
>
> Key: KAFKA-15588
> URL: https://issues.apache.org/jira/browse/KAFKA-15588
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When the member is fenced/failed, we should purge the inflight offset commits 
> and fetches.  HeartbeatRequestManager should be able to handle this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15839) Improve TopicIdPartition integration in consumer membershipManager

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15839:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Improve TopicIdPartition integration in consumer membershipManager
> --
>
> Key: KAFKA-15839
> URL: https://issues.apache.org/jira/browse/KAFKA-15839
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> It is currently used in the reconciliation path. Could be used more, just 
> leaving topicPartitions when necessary for the callbacks and interaction with 
> the subscription state that does not fully support topic ids yet
> Ensure that we properly handle topic re-creation (same name, diff topic IDs) 
> in the reconciliation process (assignment cache, same assignment comparison, 
> etc.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15773) Group protocol configuration should be validated

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15773:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15846) Review consumer leave group request best effort

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15846:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Review consumer leave group request best effort
> ---
>
> Key: KAFKA-15846
> URL: https://issues.apache.org/jira/browse/KAFKA-15846
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer sends out a leave group request with a best effort approach. 
> Transitions to LEAVING to indicate the HB manager that the request must be 
> sent, but it does not do any response handling or retrying (note that the 
> response is still handled as any other HB response). After the first HB 
> manager poll iteration while on LEAVING, the consumer transitions into 
> UNSUBSCRIBE (no matter if the request was actually sent out or not, ex, due 
> to coordinator not known). Review if this is good enough as an effort to send 
> the request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15967) Fix revocation in reconcilation logic

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15967:
--
Parent: (was: KAFKA-14048)
Issue Type: Bug  (was: Sub-task)

> Fix revocation in reconcilation logic
> -
>
> Key: KAFKA-15967
> URL: https://issues.apache.org/jira/browse/KAFKA-15967
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> Looks like there is a problem in the reconciliation logic.
> We are getting 6 partitions from an HB, we add them to 
> {{{}assignmentReadyToReconcile{}}}. Next HB we get only 4 partitions (2 are 
> revoked), we also add them to {{{}assignmentReadyToReconcile{}}}, but the 2 
> partitions that were supposed to be removed from the assignment are never 
> removed because they are still in {{{}assignmentReadyToReconcile{}}}.
> This was discovered during integration testing of 
> [https://github.com/apache/kafka/pull/14878] - part of the test 
> testRemoteAssignorRange was disabled and should be re-enabled once this is 
> fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15954:
--
Issue Type: Improvement  (was: Task)

> Review minimal effort approach on consumer last heartbeat on unsubscribe
> 
>
> Key: KAFKA-15954
> URL: https://issues.apache.org/jira/browse/KAFKA-15954
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently the legacy and new consumer follows a minimal effort approach when 
> sending a leave group (legacy) or last heartbeat request (new consumer). The 
> request is sent without waiting/handling any response. This behaviour applies 
> when the consumer is being closed or when it unsubscribes.
> For the case when the consumer is being closed, (which is a "terminal" 
> state), it makes sense to just follow a minimal effort approach for 
> "properly" leaving the group. But for the case of unsubscribe, it would maybe 
> make sense to put a little more effort in making sure that the last heartbeat 
> is sent and received by the broker. Note that unsubscribe could a temporary 
> state, where the consumer might want to re-join the group at any time. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15843:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15832) Trigger client reconciliation based on manager poll

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15832:
--
Parent: (was: KAFKA-14048)
Issue Type: Improvement  (was: Sub-task)

> Trigger client reconciliation based on manager poll
> ---
>
> Key: KAFKA-15832
> URL: https://issues.apache.org/jira/browse/KAFKA-15832
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> Currently the reconciliation logic on the client is triggered when a new 
> target assignment is received and resolved, or when new unresolved target 
> assignments are discovered in metadata.
> This could be improved by triggering the reconciliation logic on each poll 
> iteration, to reconcile whatever is ready to be reconciled. This would 
> required changes to support poll on the MembershipManager, and integrate it 
> with the current polling logic in the background thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16032) Review inconsistent error handling of OffsetFetch and OffsetCommit responses

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16032:
--
Parent: (was: KAFKA-14048)
Issue Type: Bug  (was: Sub-task)

> Review inconsistent error handling of OffsetFetch and OffsetCommit responses
> 
>
> Key: KAFKA-16032
> URL: https://issues.apache.org/jira/browse/KAFKA-16032
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
> review around this:
>  - The logic is duplicated for some errors that are treated similarly (ex. 
> NOT_COORDINATOR)
>  - Some errors are not handled similarly in both requests (ex. 
> COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
> OffsetFetch). Note that the specific errors handled by each request were kept 
> the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
> an attempt to handle the same errors, in the same way, whenever possible.
> This should be reviewed also with the goal of unifying the retry logic around 
> those errors that is now applies in multiple different places of the manager 
> depending on the request path.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15691) Add new system tests to use new consumer

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15691:
--
Priority: Major  (was: Critical)

> Add new system tests to use new consumer
> 
>
> Key: KAFKA-15691
> URL: https://issues.apache.org/jira/browse/KAFKA-15691
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16037) Upgrade existing system tests to use new consumer

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16037:
--
Priority: Blocker  (was: Critical)

> Upgrade existing system tests to use new consumer
> -
>
> Key: KAFKA-16037
> URL: https://issues.apache.org/jira/browse/KAFKA-16037
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15691) Add new system tests to use new consumer

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15691:
--
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Add new system tests to use new consumer
> 
>
> Key: KAFKA-15691
> URL: https://issues.apache.org/jira/browse/KAFKA-15691
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15691) Add new system tests to use new consumer

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15691:
--
Summary: Add new system tests to use new consumer  (was: Upgrade existing 
and add new system tests to use new consumer)

> Add new system tests to use new consumer
> 
>
> Key: KAFKA-15691
> URL: https://issues.apache.org/jira/browse/KAFKA-15691
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Priority: Critical
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16037) Upgrade existing system tests to use new consumer

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16037:
--
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Upgrade existing system tests to use new consumer
> -
>
> Key: KAFKA-16037
> URL: https://issues.apache.org/jira/browse/KAFKA-16037
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Priority: Critical
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16037) Upgrade existing system tests to use new consumer

2023-12-20 Thread Kirk True (Jira)
Kirk True created KAFKA-16037:
-

 Summary: Upgrade existing system tests to use new consumer
 Key: KAFKA-16037
 URL: https://issues.apache.org/jira/browse/KAFKA-16037
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer, system tests
Reporter: Kirk True
 Fix For: 4.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15321) Document consumer group member state machine

2023-12-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15321:
--
Parent: (was: KAFKA-14048)
Issue Type: Task  (was: Sub-task)

> Document consumer group member state machine
> 
>
> Key: KAFKA-15321
> URL: https://issues.apache.org/jira/browse/KAFKA-15321
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 4.0.0
>
>
> We need to first document the new consumer group member state machine. What 
> are the different states and what are the transitions?
> See [~pnee]'s notes: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design]
> *_Don’t forget to include diagrams for clarity!_*
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433038616


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java:
##
@@ -111,6 +112,22 @@ public synchronized void replay(
 coordinator.replay(producerId, producerEpoch, record);
 }
 
+/**
+ * Applies the transaction end marker.

Review Comment:
   nit: end transaction marker



##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -135,7 +136,22 @@ class CoordinatorLoaderImpl[T](
 
 memoryRecords.batches.forEach { batch =>
   if (batch.isControlBatch) {
-throw new IllegalStateException("Control batches are not 
supported yet.")
+batch.asScala.foreach { record =>
+  val controlRecord = ControlRecordType.parse(record.key)
+  if (controlRecord == ControlRecordType.COMMIT) {
+coordinator.replayTransactionEndMarker(

Review Comment:
   can we rename all `replayTransactionEndMarker` to 
`replayEndTransactionMarker`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433031924


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
 
 new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
   }
+
+  private def logReadResult(
+startOffset: Long,
+producerId: Long,
+producerEpoch: Short,
+controlRecordType: ControlRecordType
+  ): FetchDataInfo = {
+val fileRecords = mock(classOf[FileRecords])
+val memoryRecords = MemoryRecords.withEndTransactionMarker(
+  startOffset,
+  0L,
+  RecordBatch.NO_PARTITION_LEADER_EPOCH,
+  producerId,
+  producerEpoch,
+  new EndTransactionMarker(controlRecordType, 0)
+)
+
+when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+val bufferCapture: ArgumentCaptor[ByteBuffer] = 
ArgumentCaptor.forClass(classOf[ByteBuffer])
+when(fileRecords.readInto(
+  bufferCapture.capture(),
+  ArgumentMatchers.anyInt())
+).thenAnswer { _ =>
+  val buffer = bufferCapture.getValue
+  buffer.put(memoryRecords.buffer.duplicate)
+  buffer.flip()
+}
+
+new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)

Review Comment:
   From an API perspective, we could get both FileRecords or MemoryRecords so 
we have to handle both. In practice, it is mainly FileRecords from your read 
from a real log. Testing with MemoryRecords is not so interesting as you can 
see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433029392


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
 
 new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
   }
+
+  private def logReadResult(
+startOffset: Long,
+producerId: Long,
+producerEpoch: Short,
+controlRecordType: ControlRecordType
+  ): FetchDataInfo = {
+val fileRecords = mock(classOf[FileRecords])
+val memoryRecords = MemoryRecords.withEndTransactionMarker(
+  startOffset,
+  0L,
+  RecordBatch.NO_PARTITION_LEADER_EPOCH,
+  producerId,
+  producerEpoch,
+  new EndTransactionMarker(controlRecordType, 0)
+)
+
+when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+val bufferCapture: ArgumentCaptor[ByteBuffer] = 
ArgumentCaptor.forClass(classOf[ByteBuffer])
+when(fileRecords.readInto(
+  bufferCapture.capture(),
+  ArgumentMatchers.anyInt())
+).thenAnswer { _ =>
+  val buffer = bufferCapture.getValue
+  buffer.put(memoryRecords.buffer.duplicate)
+  buffer.flip()
+}
+
+new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)

Review Comment:
   aren't we reading from the log in 
https://github.com/apache/kafka/pull/14985/files#diff-64992589f1b20d6db189de7b1e613d222be46fb2999cdc9790f8c27c380f3092R114-R115
 as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16036; Add `group.coordinator.rebalance.protocols` and publish all new configs [kafka]

2023-12-20 Thread via GitHub


dajac commented on PR #15053:
URL: https://github.com/apache/kafka/pull/15053#issuecomment-1864868557

   @jeffkbkim The goal is to have something better than 
`group.coordinator.new.enable`. Yes, I will add it to the KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16036; Add `group.coordinator.rebalance.protocols` and publish all new configs [kafka]

2023-12-20 Thread via GitHub


jeffkbkim commented on PR #15053:
URL: https://github.com/apache/kafka/pull/15053#issuecomment-1864862018

   is the purpose of this config to prevent EA users from accidentally using 
the consumer protocol while using the new coordinator?
   
   also, should we update the kip with this new config?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432925646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   it is there to verify the reset call and we also need a real object to 
ensure the manager can send a request.  If we mock it, then we have to mock the 
canSendRequest call, then it is pointless to verify the request returned from 
poll because it will always return one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13922) Unable to generate coverage reports for the whole project

2023-12-20 Thread Andras Katona (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799084#comment-17799084
 ] 

Andras Katona edited comment on KAFKA-13922 at 12/20/23 4:15 PM:
-

Okay, I was trying to configure the jacoco-report-aggregation for the project, 
because we're removing the rootreport part from the build.gradle, that's not 
working any more. But failed to include that aggregation plugin in the 
timeframe I allocated myself to try. It's not adding much, sonar and jenkins 
are able to fetch the coverage reports from the build dirs (they would not use 
the aggregated one anyway), so I convinced myself that it's fine to stop trying 
:).

I'm doing a last round of tests with my adjustments (see the above mentioned 
[PR|https://github.com/apache/kafka/pull/11982/files])


was (Author: akatona):
Okay, I was trying to configure the jacoco-report-aggregation for the project, 
because we're removing the rootreport part from the build.gradle, that's not 
working any more. But failed to include that aggregation plugin in the 
timeframe I allocated myself to try. It's not adding much, sonar and jenkins 
are able to fetch the coverage reports from the build dirs (they would not use 
the aggregated one anyway), so I convinced myself that it's fine to stop trying 
:).

I'm doing a last round of tests with my adjustments (see the above mentioned 
[PR}https://github.com/apache/kafka/pull/11982/files])

> Unable to generate coverage reports for the whole project
> -
>
> Key: KAFKA-13922
> URL: https://issues.apache.org/jira/browse/KAFKA-13922
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.5.0
>Reporter: Patrik Nagy
>Assignee: Andras Katona
>Priority: Minor
>  Labels: cloudera
>
> It is documented in the project that if we need code coverage reports for the 
> whole project, we need to run something like this where we enabled the test 
> coverage flag and run the reportCoverage task:
> {code:java}
> ./gradlew reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false
> {code}
> If I run it, the build will fail in the end because of jacocoRootReport:
> {code:java}
> 14:34:41 > Task :jacocoRootReport FAILED
> 14:34:41 
> 14:34:41 FAILURE: Build failed with an exception.
> 14:34:41 
> 14:34:41 * What went wrong:
> 14:34:41 Some problems were found with the configuration of task 
> ':jacocoRootReport' (type 'JacocoReport').
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'jacocoClasspath' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 'jacocoClasspath'.
> 14:34:41   2. Mark property 'jacocoClasspath' as optional.
> 14:34:41 
> 14:34:41 Please refer to 
> https://docs.gradle.org/7.3.3/userguide/validation_problems.html#value_not_set
>  for more details about this problem.
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'reports.enabledReports.html.outputLocation' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 
> 'reports.enabledReports.html.outputLocation'.
> 14:34:41   2. Mark property 'reports.enabledReports.html.outputLocation' 
> as optional.
> 14:34:41 
> 14:34:41 Please refer to 
> https://docs.gradle.org/7.3.3/userguide/validation_problems.html#value_not_set
>  for more details about this problem.
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'reports.enabledReports.xml.outputLocation' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 
> 'reports.enabledReports.xml.outputLocation'.
> 14:34:41   2. Mark property 'reports.enabledReports.xml.outputLocation' 
> as optional. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-13922) Unable to generate coverage reports for the whole project

2023-12-20 Thread Andras Katona (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799084#comment-17799084
 ] 

Andras Katona edited comment on KAFKA-13922 at 12/20/23 4:15 PM:
-

Okay, I was trying to configure the jacoco-report-aggregation for the project, 
because we're removing the rootreport part from the build.gradle, that's not 
working any more. But failed to include that aggregation plugin in the 
timeframe I allocated myself to try. It's not adding much, sonar and jenkins 
are able to fetch the coverage reports from the build dirs (they would not use 
the aggregated one anyway), so I convinced myself that it's fine to stop trying 
:).

I'm doing a last round of tests with my adjustments (see the above mentioned 
[PR}https://github.com/apache/kafka/pull/11982/files])


was (Author: akatona):
Okay, I was trying to configure the jacoco-report-aggregation for the project, 
because we're removing the rootreport part from the build.gradle, that's not 
working any more. But failed to include that aggregation plugin in the 
timeframe I allocated myself to try. It's not adding much, sonar and jenkins 
are able to fetch the coverage reports from the build dirs (they would not use 
the aggregated one anyway), so I convinced myself that it's fine to stop trying 
:).


> Unable to generate coverage reports for the whole project
> -
>
> Key: KAFKA-13922
> URL: https://issues.apache.org/jira/browse/KAFKA-13922
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.5.0
>Reporter: Patrik Nagy
>Assignee: Andras Katona
>Priority: Minor
>  Labels: cloudera
>
> It is documented in the project that if we need code coverage reports for the 
> whole project, we need to run something like this where we enabled the test 
> coverage flag and run the reportCoverage task:
> {code:java}
> ./gradlew reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false
> {code}
> If I run it, the build will fail in the end because of jacocoRootReport:
> {code:java}
> 14:34:41 > Task :jacocoRootReport FAILED
> 14:34:41 
> 14:34:41 FAILURE: Build failed with an exception.
> 14:34:41 
> 14:34:41 * What went wrong:
> 14:34:41 Some problems were found with the configuration of task 
> ':jacocoRootReport' (type 'JacocoReport').
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'jacocoClasspath' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 'jacocoClasspath'.
> 14:34:41   2. Mark property 'jacocoClasspath' as optional.
> 14:34:41 
> 14:34:41 Please refer to 
> https://docs.gradle.org/7.3.3/userguide/validation_problems.html#value_not_set
>  for more details about this problem.
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'reports.enabledReports.html.outputLocation' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 
> 'reports.enabledReports.html.outputLocation'.
> 14:34:41   2. Mark property 'reports.enabledReports.html.outputLocation' 
> as optional.
> 14:34:41 
> 14:34:41 Please refer to 
> https://docs.gradle.org/7.3.3/userguide/validation_problems.html#value_not_set
>  for more details about this problem.
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'reports.enabledReports.xml.outputLocation' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 
> 'reports.enabledReports.xml.outputLocation'.
> 14:34:41   2. Mark property 'reports.enabledReports.xml.outputLocation' 
> as optional. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432918143


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   If it is too much work to change, let's change it later and merge it as it 
is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13922) Unable to generate coverage reports for the whole project

2023-12-20 Thread Andras Katona (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799084#comment-17799084
 ] 

Andras Katona commented on KAFKA-13922:
---

Okay, I was trying to configure the jacoco-report-aggregation for the project, 
because we're removing the rootreport part from the build.gradle, that's not 
working any more. But failed to include that aggregation plugin in the 
timeframe I allocated myself to try. It's not adding much, sonar and jenkins 
are able to fetch the coverage reports from the build dirs (they would not use 
the aggregated one anyway), so I convinced myself that it's fine to stop trying 
:).


> Unable to generate coverage reports for the whole project
> -
>
> Key: KAFKA-13922
> URL: https://issues.apache.org/jira/browse/KAFKA-13922
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.5.0
>Reporter: Patrik Nagy
>Assignee: Andras Katona
>Priority: Minor
>  Labels: cloudera
>
> It is documented in the project that if we need code coverage reports for the 
> whole project, we need to run something like this where we enabled the test 
> coverage flag and run the reportCoverage task:
> {code:java}
> ./gradlew reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false
> {code}
> If I run it, the build will fail in the end because of jacocoRootReport:
> {code:java}
> 14:34:41 > Task :jacocoRootReport FAILED
> 14:34:41 
> 14:34:41 FAILURE: Build failed with an exception.
> 14:34:41 
> 14:34:41 * What went wrong:
> 14:34:41 Some problems were found with the configuration of task 
> ':jacocoRootReport' (type 'JacocoReport').
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'jacocoClasspath' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 'jacocoClasspath'.
> 14:34:41   2. Mark property 'jacocoClasspath' as optional.
> 14:34:41 
> 14:34:41 Please refer to 
> https://docs.gradle.org/7.3.3/userguide/validation_problems.html#value_not_set
>  for more details about this problem.
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'reports.enabledReports.html.outputLocation' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 
> 'reports.enabledReports.html.outputLocation'.
> 14:34:41   2. Mark property 'reports.enabledReports.html.outputLocation' 
> as optional.
> 14:34:41 
> 14:34:41 Please refer to 
> https://docs.gradle.org/7.3.3/userguide/validation_problems.html#value_not_set
>  for more details about this problem.
> 14:34:41   - Type 'org.gradle.testing.jacoco.tasks.JacocoReport' property 
> 'reports.enabledReports.xml.outputLocation' doesn't have a configured value.
> 14:34:41 
> 14:34:41 Reason: This property isn't marked as optional and no value has 
> been configured.
> 14:34:41 
> 14:34:41 Possible solutions:
> 14:34:41   1. Assign a value to 
> 'reports.enabledReports.xml.outputLocation'.
> 14:34:41   2. Mark property 'reports.enabledReports.xml.outputLocation' 
> as optional. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432918143


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   If it is too much work to change, let's change it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432905225


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1185,9 +1263,14 @@ else if (partition.equals(internalPart))
 /**
  * For testUpdatePartially, validates that updatedMetadata is matching 
expected part1Metadata, part2Metadata, interalPartMetadata, nodes & more.
  */
-void validateForUpdatePartitionLeadership(Metadata updatedMetadata, 
PartitionMetadata part1Metadata, PartitionMetadata part2Metadata, 
PartitionMetadata internalPartMetadata,
-List expectedNodes, String expectedClusterId, Set 
expectedUnauthorisedTopics, Set expectedInvalidTopics, Set 
expectedInternalTopics,
-Node expectedController, Map expectedTopicIds) {
+void validateForUpdatePartitionLeadership(Metadata updatedMetadata,
+  PartitionMetadata part1Metadata, 
PartitionMetadata part2Metadata, PartitionMetadata part12Metadata,

Review Comment:
   yup that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432897655


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   Why is there still this spy?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   Is it correct that we have this call now in two places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13421 - fix flaky ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup [kafka]

2023-12-20 Thread via GitHub


philipnee closed pull request #12038: KAFKA-13421 - fix flaky 
ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
URL: https://github.com/apache/kafka/pull/12038


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Prototyping Rebalance Protocol [kafka]

2023-12-20 Thread via GitHub


philipnee closed pull request #13330: Prototyping Rebalance Protocol
URL: https://github.com/apache/kafka/pull/13330


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Prototype take 2 [kafka]

2023-12-20 Thread via GitHub


philipnee closed pull request #13217: Prototype take 2
URL: https://github.com/apache/kafka/pull/13217


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ctr-staging-testing-draft [kafka]

2023-12-20 Thread via GitHub


philipnee closed pull request #13716: ctr-staging-testing-draft
URL: https://github.com/apache/kafka/pull/13716


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KafkaConsumer not respecting max.poll.record when subscribing to more than 1 topics. [kafka]

2023-12-20 Thread via GitHub


philipnee closed pull request #14772: KafkaConsumer not respecting 
max.poll.record when subscribing to more than 1 topics.
URL: https://github.com/apache/kafka/pull/14772


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1432884537


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -190,4 +170,62 @@ class CoordinatorPartitionWriter[T](
 throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
 }
   }
+
+  /**
+   * Write the transaction end marker.
+   *
+   * @param tpThe partition to write records to.
+   * @param producerIdThe producer id.
+   * @param producerEpoch The producer epoch.
+   * @param coordinatorEpoch  The epoch of the transaction coordinator.
+   * @param resultThe transaction result.
+   * @return The log end offset right after the written records.
+   * @throws KafkaException Any KafkaException caught during the write 
operation.
+   */
+  override def appendEndTransactionMarker(
+tp: TopicPartition,
+producerId: Long,
+producerEpoch: Short,
+coordinatorEpoch: Int,
+result: TransactionResult
+  ): Long = {
+val controlRecordType = result match {
+  case TransactionResult.COMMIT => ControlRecordType.COMMIT
+  case TransactionResult.ABORT => ControlRecordType.ABORT
+}
+
+internalAppend(tp, MemoryRecords.withEndTransactionMarker(
+  producerId,
+  producerEpoch,
+  new EndTransactionMarker(controlRecordType, coordinatorEpoch)
+))
+  }
+
+  private def internalAppend(
+tp: TopicPartition,
+memoryRecords: MemoryRecords
+  ): Long = {
+var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
+replicaManager.appendRecords(
+  timeout = 0L,
+  requiredAcks = 1,

Review Comment:
   We always write with acks=1 and then rely on the high watermark listener to 
"commit".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1432877933


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##
@@ -234,6 +237,79 @@ class CoordinatorPartitionWriterTest {
 assertEquals(records, receivedRecords)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[ControlRecordType], names = Array("COMMIT", 
"ABORT"))
+  def testWriteTransactionEndMarker(controlRecordType: ControlRecordType): 
Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val time = new MockTime()
+val partitionRecordWriter = new CoordinatorPartitionWriter(
+  replicaManager,
+  new StringKeyValueSerializer(),
+  CompressionType.NONE,
+  time
+)
+
+when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
+  Collections.emptyMap(),
+  new Properties()
+)))
+
+val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] 
=> Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
+when(replicaManager.appendRecords(
+  ArgumentMatchers.eq(0L),
+  ArgumentMatchers.eq(1.toShort),
+  ArgumentMatchers.eq(true),
+  ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+  recordsCapture.capture(),
+  callbackCapture.capture(),
+  ArgumentMatchers.any(),
+  ArgumentMatchers.any(),
+  ArgumentMatchers.any(),
+  ArgumentMatchers.any(),
+  ArgumentMatchers.any()
+)).thenAnswer(_ => {
+  callbackCapture.getValue.apply(Map(
+tp -> new PartitionResponse(
+  Errors.NONE,
+  5,
+  10,
+  RecordBatch.NO_TIMESTAMP,
+  -1,
+  Collections.emptyList(),
+  ""
+)
+  ))
+})
+
+assertEquals(11, partitionRecordWriter.appendEndTransactionMarker(
+  tp,
+  100L,
+  50.toShort,
+  10,
+  if (controlRecordType == ControlRecordType.COMMIT) 
TransactionResult.COMMIT else TransactionResult.ABORT
+))
+
+val batch = recordsCapture.getValue.getOrElse(tp,
+  throw new AssertionError(s"No records for $tp"))
+assertEquals(1, batch.batches.asScala.toList.size)
+
+val firstBatch = batch.batches.asScala.head
+assertEquals(100L, firstBatch.producerId)
+assertEquals(50.toShort, firstBatch.producerEpoch)
+assertTrue(firstBatch.isTransactional)

Review Comment:
   should we also confirm the firstBatch is a control record?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]

2023-12-20 Thread via GitHub


lucasbru commented on PR #14963:
URL: https://github.com/apache/kafka/pull/14963#issuecomment-1864703292

   Hey @Joker-5, I took the ticket since your original PR seemed to only change 
the legacy consumer, so I thought it was just linked to the wrong ticket.
   
   I think there are some things missing here
- enable unit / integration tests
- the way you implemented it, I think the interceptors will run as part of 
the background thread, but I think they should not interfere with the 
background thread and run as part of the application thread instead.
   
   How about we merge my PR which has the two changes and I add you in a 
`Co-authored-by` tag? Sorry again for the confusion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1432873518


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -190,4 +170,62 @@ class CoordinatorPartitionWriter[T](
 throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
 }
   }
+
+  /**
+   * Write the transaction end marker.
+   *
+   * @param tpThe partition to write records to.
+   * @param producerIdThe producer id.
+   * @param producerEpoch The producer epoch.
+   * @param coordinatorEpoch  The epoch of the transaction coordinator.
+   * @param resultThe transaction result.
+   * @return The log end offset right after the written records.
+   * @throws KafkaException Any KafkaException caught during the write 
operation.
+   */
+  override def appendEndTransactionMarker(
+tp: TopicPartition,
+producerId: Long,
+producerEpoch: Short,
+coordinatorEpoch: Int,
+result: TransactionResult
+  ): Long = {
+val controlRecordType = result match {
+  case TransactionResult.COMMIT => ControlRecordType.COMMIT
+  case TransactionResult.ABORT => ControlRecordType.ABORT
+}
+
+internalAppend(tp, MemoryRecords.withEndTransactionMarker(
+  producerId,
+  producerEpoch,
+  new EndTransactionMarker(controlRecordType, coordinatorEpoch)
+))
+  }
+
+  private def internalAppend(
+tp: TopicPartition,
+memoryRecords: MemoryRecords
+  ): Long = {
+var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
+replicaManager.appendRecords(
+  timeout = 0L,
+  requiredAcks = 1,

Review Comment:
   were these always acks=1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432836418


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1067,25 +1067,102 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
 }
 
+@Test
+public void testTopicMetadataOnUpdatePartitionLeadership() {
+String topic = "input-topic";
+Uuid topicId = Uuid.randomUuid();
+
+Time time = new MockTime();
+
+metadata = new Metadata(
+refreshBackoffMs,
+refreshBackoffMaxMs,
+metadataExpireMs,
+new LogContext(),
+new ClusterResourceListeners());
+Node node1 = new Node(1, "localhost", 9091);
+Node node2 = new Node(2, "localhost", 9091);
+
+TopicPartition tp0 = new TopicPartition(topic, 0);
+MetadataResponse.PartitionMetadata partition0 = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp0,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+TopicPartition tp1 = new TopicPartition(topic, 1);
+MetadataResponse.PartitionMetadata partition1 =
+new MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp1,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(
+Errors.NONE,
+topic,
+topicId,
+false,
+Arrays.asList(partition0, partition1),
+MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+);
+
+// Initialize metadata with two partitions
+MetadataResponse response = RequestTestUtils.metadataResponse(
+Arrays.asList(node1, node2),
+"clusterId",
+node1.id(),
+Collections.singletonList(topicMetadata));
+metadata.updateWithCurrentRequestVersion(
+response,
+false,
+time.milliseconds());
+assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+// "input-topic" partition 1 leader changes from node 1 to node 2
+metadata.updatePartitionLeadership(
+Collections.singletonMap(
+tp1,
+new Metadata.LeaderIdAndEpoch(
+Optional.of(2),
+Optional.of(3)
+)),
+Arrays.asList(node1)
+);
+assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+assertEquals(2, metadata.fetch().partition(tp1).leader().id());
+}
+
 @Test
 public void testUpdatePartitionLeadership() {
 Time time = new MockTime();
 
-// Setup metadata with initial set of 2 partitions, 1 each across 
topics, with 5 nodes.
-// Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic.
+// Initialize metadata
 int numNodes = 5;
 metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
 ClusterResourceListener mockListener = 
Mockito.mock(ClusterResourceListener.class);
 metadata.addClusterUpdateListener(mockListener);
-
+// topic1 has 2 partitions: tp11, tp12
+// topic2 has 1 partition: tp21
 String topic1 = "topic1";
-TopicPartition partition1 = new TopicPartition(topic1, 0);
-PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
partition1, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), 
Arrays.asList(1, 2), Arrays.asList(3));
+TopicPartition tp11 = new TopicPartition(topic1, 0);
+PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 
2), Arrays.asList(3));
 Uuid topic1Id = Uuid.randomUuid();
+TopicPartition tp12 = new TopicPartition(topic1, 1);
+PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, 
tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 
3), Arrays.asList(1));

Review Comment:
   yap - i was trying to be consistent with the previous variable names such as 
`PartitionMetadata part1Metadata`. I'll follow up with the changes in a 
separated refactor PR.



-- 
This is an automated message from the Apache Git 

Re: [PR] Implement RemoteLogSizeBytes [kafka]

2023-12-20 Thread via GitHub


kamalcph commented on code in PR #15050:
URL: https://github.com/apache/kafka/pull/15050#discussion_r1432837641


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -426,6 +427,18 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
 
   def remoteLogMetadataCount: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value()
 
+  def recordRemoteLogSizeBytes(partition: Int, bytesLag: Long): Unit = {

Review Comment:
   rename `bytesLag` to `size` (or) something similar



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432838336


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1185,9 +1263,14 @@ else if (partition.equals(internalPart))
 /**
  * For testUpdatePartially, validates that updatedMetadata is matching 
expected part1Metadata, part2Metadata, interalPartMetadata, nodes & more.
  */
-void validateForUpdatePartitionLeadership(Metadata updatedMetadata, 
PartitionMetadata part1Metadata, PartitionMetadata part2Metadata, 
PartitionMetadata internalPartMetadata,
-List expectedNodes, String expectedClusterId, Set 
expectedUnauthorisedTopics, Set expectedInvalidTopics, Set 
expectedInternalTopics,
-Node expectedController, Map expectedTopicIds) {
+void validateForUpdatePartitionLeadership(Metadata updatedMetadata,
+  PartitionMetadata part1Metadata, 
PartitionMetadata part2Metadata, PartitionMetadata part12Metadata,

Review Comment:
   thanks - would you mind leaving this for the refactor PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


jolshan commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1864633877

   I was looking at some of the test failures and wasn't seeing the 
OffsetsApiIntegrationTests failing elsewhere. Can we check those? Running 
locally may not be enough so I will also restart the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: skip 'zinc' phase from gradle dependency-check plugin [kafka]

2023-12-20 Thread via GitHub


raboof opened a new pull request, #15054:
URL: https://github.com/apache/kafka/pull/15054

   This avoids `gradle dependencyCheckAggregate` from reporting on advisories 
in build-time dependencies (e.g. CVE-2023-46122) which typically should not 
affect us.
   
   I checked that this does not prevent advisories in 'regular' dependencies 
from being reported (but there currently are none).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432819447


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1185,9 +1263,14 @@ else if (partition.equals(internalPart))
 /**
  * For testUpdatePartially, validates that updatedMetadata is matching 
expected part1Metadata, part2Metadata, interalPartMetadata, nodes & more.
  */
-void validateForUpdatePartitionLeadership(Metadata updatedMetadata, 
PartitionMetadata part1Metadata, PartitionMetadata part2Metadata, 
PartitionMetadata internalPartMetadata,
-List expectedNodes, String expectedClusterId, Set 
expectedUnauthorisedTopics, Set expectedInvalidTopics, Set 
expectedInternalTopics,
-Node expectedController, Map expectedTopicIds) {
+void validateForUpdatePartitionLeadership(Metadata updatedMetadata,
+  PartitionMetadata part1Metadata, 
PartitionMetadata part2Metadata, PartitionMetadata part12Metadata,

Review Comment:
   nit: (for refactor) -- if we decided to put all the arguments on new lines, 
let's do it for the all the arguments. The ones with three are a bit harder to 
read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432814990


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1067,25 +1067,102 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
 }
 
+@Test
+public void testTopicMetadataOnUpdatePartitionLeadership() {
+String topic = "input-topic";
+Uuid topicId = Uuid.randomUuid();
+
+Time time = new MockTime();
+
+metadata = new Metadata(
+refreshBackoffMs,
+refreshBackoffMaxMs,
+metadataExpireMs,
+new LogContext(),
+new ClusterResourceListeners());
+Node node1 = new Node(1, "localhost", 9091);
+Node node2 = new Node(2, "localhost", 9091);
+
+TopicPartition tp0 = new TopicPartition(topic, 0);
+MetadataResponse.PartitionMetadata partition0 = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp0,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+TopicPartition tp1 = new TopicPartition(topic, 1);
+MetadataResponse.PartitionMetadata partition1 =
+new MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp1,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(
+Errors.NONE,
+topic,
+topicId,
+false,
+Arrays.asList(partition0, partition1),
+MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+);
+
+// Initialize metadata with two partitions
+MetadataResponse response = RequestTestUtils.metadataResponse(
+Arrays.asList(node1, node2),
+"clusterId",
+node1.id(),
+Collections.singletonList(topicMetadata));
+metadata.updateWithCurrentRequestVersion(
+response,
+false,
+time.milliseconds());
+assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+// "input-topic" partition 1 leader changes from node 1 to node 2
+metadata.updatePartitionLeadership(
+Collections.singletonMap(
+tp1,
+new Metadata.LeaderIdAndEpoch(
+Optional.of(2),
+Optional.of(3)
+)),
+Arrays.asList(node1)
+);
+assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+assertEquals(2, metadata.fetch().partition(tp1).leader().id());
+}
+
 @Test
 public void testUpdatePartitionLeadership() {
 Time time = new MockTime();
 
-// Setup metadata with initial set of 2 partitions, 1 each across 
topics, with 5 nodes.
-// Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic.
+// Initialize metadata
 int numNodes = 5;
 metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
 ClusterResourceListener mockListener = 
Mockito.mock(ClusterResourceListener.class);
 metadata.addClusterUpdateListener(mockListener);
-
+// topic1 has 2 partitions: tp11, tp12
+// topic2 has 1 partition: tp21
 String topic1 = "topic1";
-TopicPartition partition1 = new TopicPartition(topic1, 0);
-PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
partition1, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), 
Arrays.asList(1, 2), Arrays.asList(3));
+TopicPartition tp11 = new TopicPartition(topic1, 0);
+PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 
2), Arrays.asList(3));
 Uuid topic1Id = Uuid.randomUuid();
+TopicPartition tp12 = new TopicPartition(topic1, 1);
+PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, 
tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 
3), Arrays.asList(1));

Review Comment:
   nit: (we can address in a followup refactor) this naming is a bit confusing. 
Maybe all should be part11Metadata, part12Metadata, part21Metadata or even just 
tp10Metadata, tp11Metadata etc.



-- 
This is an automated message from 

Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432814990


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1067,25 +1067,102 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
 }
 
+@Test
+public void testTopicMetadataOnUpdatePartitionLeadership() {
+String topic = "input-topic";
+Uuid topicId = Uuid.randomUuid();
+
+Time time = new MockTime();
+
+metadata = new Metadata(
+refreshBackoffMs,
+refreshBackoffMaxMs,
+metadataExpireMs,
+new LogContext(),
+new ClusterResourceListeners());
+Node node1 = new Node(1, "localhost", 9091);
+Node node2 = new Node(2, "localhost", 9091);
+
+TopicPartition tp0 = new TopicPartition(topic, 0);
+MetadataResponse.PartitionMetadata partition0 = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp0,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+TopicPartition tp1 = new TopicPartition(topic, 1);
+MetadataResponse.PartitionMetadata partition1 =
+new MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp1,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(
+Errors.NONE,
+topic,
+topicId,
+false,
+Arrays.asList(partition0, partition1),
+MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+);
+
+// Initialize metadata with two partitions
+MetadataResponse response = RequestTestUtils.metadataResponse(
+Arrays.asList(node1, node2),
+"clusterId",
+node1.id(),
+Collections.singletonList(topicMetadata));
+metadata.updateWithCurrentRequestVersion(
+response,
+false,
+time.milliseconds());
+assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+// "input-topic" partition 1 leader changes from node 1 to node 2
+metadata.updatePartitionLeadership(
+Collections.singletonMap(
+tp1,
+new Metadata.LeaderIdAndEpoch(
+Optional.of(2),
+Optional.of(3)
+)),
+Arrays.asList(node1)
+);
+assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+assertEquals(2, metadata.fetch().partition(tp1).leader().id());
+}
+
 @Test
 public void testUpdatePartitionLeadership() {
 Time time = new MockTime();
 
-// Setup metadata with initial set of 2 partitions, 1 each across 
topics, with 5 nodes.
-// Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic.
+// Initialize metadata
 int numNodes = 5;
 metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
 ClusterResourceListener mockListener = 
Mockito.mock(ClusterResourceListener.class);
 metadata.addClusterUpdateListener(mockListener);
-
+// topic1 has 2 partitions: tp11, tp12
+// topic2 has 1 partition: tp21
 String topic1 = "topic1";
-TopicPartition partition1 = new TopicPartition(topic1, 0);
-PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
partition1, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), 
Arrays.asList(1, 2), Arrays.asList(3));
+TopicPartition tp11 = new TopicPartition(topic1, 0);
+PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 
2), Arrays.asList(3));
 Uuid topic1Id = Uuid.randomUuid();
+TopicPartition tp12 = new TopicPartition(topic1, 1);
+PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, 
tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 
3), Arrays.asList(1));

Review Comment:
   nit: (we can address in a followup refactor) this naming is a bit confusing. 
Maybe all should be part11Metadata, part12Metadata, part21Metadata or even just 
tp11Metadata, tp12Metadata etc.



-- 
This is an automated message from 

Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-20 Thread via GitHub


jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432810817


##
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##
@@ -1068,143 +1067,328 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 }
 
 @Test
-public void testUpdatePartitionLeadership() {
+public void testTopicMetadataOnUpdatePartitionLeadership() {
+String topic = "input-topic";
+Uuid topicId = Uuid.randomUuid();
+
+Time time = new MockTime();
+
+metadata = new Metadata(
+refreshBackoffMs,
+refreshBackoffMaxMs,
+metadataExpireMs,
+new LogContext(),
+new ClusterResourceListeners());
+Node node1 = new Node(1, "localhost", 9091);
+Node node2 = new Node(2, "localhost", 9091);
+
+TopicPartition tp0 = new TopicPartition(topic, 0);
+MetadataResponse.PartitionMetadata partition0 = new 
MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp0,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+TopicPartition tp1 = new TopicPartition(topic, 1);
+MetadataResponse.PartitionMetadata partition1 =
+new MetadataResponse.PartitionMetadata(
+Errors.NONE,
+tp1,
+Optional.of(1),
+Optional.of(1),
+Arrays.asList(1, 2),
+Arrays.asList(1, 2),
+Collections.emptyList()
+);
+MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(
+Errors.NONE,
+topic,
+topicId,
+false,
+Arrays.asList(partition0, partition1),
+MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+);
+
+// Initialize metadata with two partitions
+MetadataResponse response = RequestTestUtils.metadataResponse(
+Arrays.asList(node1, node2),
+"clusterId",
+node1.id(),
+Collections.singletonList(topicMetadata));
+metadata.updateWithCurrentRequestVersion(
+response,
+false,
+time.milliseconds());
+assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+// "input-topic" partition 1 leader changes from node 1 to node 2
+metadata.updatePartitionLeadership(

Review Comment:
   Maybe we could just have a comment there (in the 1130 comment) 
   something like: In KIP-951 we can update leadership info for a single 
partition. "input-topic partition 1...(everything else you said)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15991.
---
Resolution: Fixed

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-15991:
--
Fix Version/s: 3.7.0
   (was: 3.8.0)

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15818) Implement max poll interval

2023-12-20 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799043#comment-17799043
 ] 

Philip Nee commented on KAFKA-15818:


hi stan - [KAFKA-16026|https://github.com/apache/kafka/pull/15035] is also part 
of it.  will close the issue after 16026 is merged.

> Implement max poll interval
> ---
>
> Key: KAFKA-15818
> URL: https://issues.apache.org/jira/browse/KAFKA-15818
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> The consumer needs to be polled at a candance lower than 
> MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group.  
> Currently, we send an acknowledgment event to the network thread per poll.  
> The event only triggers update on autocommit state, we need to implement 
> updating the poll timer so that the consumer can leave the group when the 
> timer expires. 
>  
> The current logic looks like this:
> {code:java}
>  if (heartbeat.pollTimeoutExpired(now)) {
> // the poll timeout has expired, which means that the foreground thread 
> has stalled
> // in between calls to poll().
> log.warn("consumer poll timeout has expired. This means the time between 
> subsequent calls to poll() " +
> "was longer than the configured max.poll.interval.ms, which typically 
> implies that " +
> "the poll loop is spending too much time processing messages. You can 
> address this " +
> "either by increasing max.poll.interval.ms or by reducing the maximum 
> size of batches " +
> "returned in poll() with max.poll.records.");
> maybeLeaveGroup("consumer poll timeout has expired.");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1864595438

   @cadonna @dajac @AndrewJSchofield  - Thanks a lot for the reviews.  I've 
made changes accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432803468


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -679,4 +683,23 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager() {
 heartbeatRequestState,
 backgroundEventHandler);
 }
+
+private HeartbeatRequestManager createHeartbeatRequestManager(

Review Comment:
   turned out we are using testBuilder in this test again.  So we might want to 
refactor the test and turn everything to mock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432800050


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -191,10 +191,11 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
 "returned in poll() with max.poll.records.");
 // This should trigger a heartbeat with leave group epoch
-membershipManager.transitionToStaled();
+membershipManager.transitionToStale();

Review Comment:
   transitionTo**Stale**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432799267


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   Thanks, I originally thought we wanted to abandon the partitions after 
sending the leave group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


dajac commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432786211


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   > Do you mean in `transitionToStaled()`? That makes sense to me.
   
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16013: Throw an exception in DelayedRemoteFetch for follower fetch replicas. [kafka]

2023-12-20 Thread via GitHub


satishd commented on PR #15015:
URL: https://github.com/apache/kafka/pull/15015#issuecomment-1864568359

   Thanks @showuon for the update, LGTM. 
   I will merge the changes if the Jenkins jobs do not show any related test 
failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15456: Commit/Fetch error handling improvements and V9 support [kafka]

2023-12-20 Thread via GitHub


lianetm commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1432774544


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -272,6 +280,10 @@ private void process(final LeaveOnCloseApplicationEvent 
event) {
 event.chain(future);
 }
 
+private long getExpirationTimeForTimeout(final long timeoutMs) {
+return (timeoutMs == Long.MAX_VALUE) ? Long.MAX_VALUE : 
System.currentTimeMillis() + timeoutMs;

Review Comment:
   I fixed the `getExpirationTimeForTimeout` (simply returning the max if it 
overflows to negative, as this timeout is always > 0, coming from a Duration 
object from the API calls). We need the same calculation when processing 
topicMetadata events 
[here](https://github.com/apache/kafka/blob/1b95d01e49d39927633720e31ecb0f9a6ce659c9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L240)
 (I was intentionally keeping the logic similar for the 2 event that support 
timeouts for now), I would fix that one here too to use the same 
`getExpirationTimeForTimeout` if we're ok with it @AndrewJSchofield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >