Re: [PR] KAFKA-16661: Added a lower `log.initial.task.delay.ms` value [kafka]

2024-06-06 Thread via GitHub


vinay272001 commented on PR #16221:
URL: https://github.com/apache/kafka/pull/16221#issuecomment-2151546958

   Hey @showuon , can you please review the PR and check if anything else needs 
to be done on this. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-4094) Fix importance labels for Kafka Server config

2024-06-06 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852661#comment-17852661
 ] 

Ksolves commented on KAFKA-4094:


Hi [~jkreps] 
Can you please confirm where the change needs to be implemented? Also, which 
component or file does it belong to? Seems like we only need to make changes to 
the broker config. Please confirm


Lastly, from my understanding, broker configurations should be categorized by 
their priority as below:


*HIGH Priority:*
 * broker.id
 * listerners

*MEDIUM Priority:*
 * log.dirs
 * num.partitions
 * log.cleanup.policy

*LOW Priority:*
 * message.max.bytes
 * replica.fetch.max.bytes
 * log.segment.bytes

Please confirm if this is correct or if there are any additional configurations 
or details to consider.

> Fix importance labels for Kafka Server config
> -
>
> Key: KAFKA-4094
> URL: https://issues.apache.org/jira/browse/KAFKA-4094
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Priority: Major
>  Labels: newbie
>
> We have > 100 server configs. The importance label is meant to help people 
> navigate this in a sane way. The intention is something like the following:
> HIGH - things you must think about and set
> MEDIUM - things you don't necessarily need to set but that you might want to 
> tune
> LOW - thing you probably don't need to set
> Currently we have a gazillion things marked as high including very subtle 
> tuning params and also things marked as deprecated (which probably should be 
> its own importance level). This makes it really hard for people to figure out 
> which configurations to actually learn about and use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16661) add a lower `log.initial.task.delay.ms` value to integration test framework

2024-06-06 Thread Vinay Agarwal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852662#comment-17852662
 ] 

Vinay Agarwal commented on KAFKA-16661:
---

Hey [~showuon] , can you review the PR

> add a lower `log.initial.task.delay.ms` value to integration test framework
> ---
>
> Key: KAFKA-16661
> URL: https://issues.apache.org/jira/browse/KAFKA-16661
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Vinay Agarwal
>Priority: Major
>  Labels: newbie, newbie++
>
> After KAFKA-16552, we created an internal config `log.initial.task.delay.ms` 
> to control the initial task delay in log manager. This ticket follows it up, 
> to set a default low value (100ms, 500ms maybe?) for it, to speed up the 
> tests.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16661) add a lower `log.initial.task.delay.ms` value to integration test framework

2024-06-06 Thread Vinay Agarwal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852662#comment-17852662
 ] 

Vinay Agarwal edited comment on KAFKA-16661 at 6/6/24 7:04 AM:
---

Hey [~showuon] , can you please review the PR


was (Author: JIRAUSER305662):
Hey [~showuon] , can you review the PR

> add a lower `log.initial.task.delay.ms` value to integration test framework
> ---
>
> Key: KAFKA-16661
> URL: https://issues.apache.org/jira/browse/KAFKA-16661
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Vinay Agarwal
>Priority: Major
>  Labels: newbie, newbie++
>
> After KAFKA-16552, we created an internal config `log.initial.task.delay.ms` 
> to control the initial task delay in log manager. This ticket follows it up, 
> to set a default low value (100ms, 500ms maybe?) for it, to speed up the 
> tests.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628901258


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {
+if (this.isCancelled()) return;
+withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+ 

Re: [PR] KAFKA-15045: (KIP-924 pt. 18) Better assignment testing [kafka]

2024-06-06 Thread via GitHub


ableegoldman commented on code in PR #16201:
URL: https://github.com/apache/kafka/pull/16201#discussion_r1628531098


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -238,24 +310,37 @@ public static Map 
optimizeRackAwareStandbyTas
 final int crossRackTrafficCost = 
applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt();
 final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt();
 
+final Set standbyTasksToOptimize = new HashSet<>();
+kafkaStreamsAssignments.values().forEach(assignment -> {
+final Set standbyTasksForAssignment = 
assignment.tasks().values().stream()
+.filter(task -> task.type() == AssignedTask.Type.STANDBY)
+.map(AssignedTask::id)
+.collect(Collectors.toSet());
+standbyTasksToOptimize.addAll(standbyTasksForAssignment);
+});
+
 final Map> topicPartitionsByTaskId =
 
applicationState.allTasks().values().stream().collect(Collectors.toMap(
 TaskInfo::id,
 t -> 
t.topicPartitions().stream().filter(TaskTopicPartition::isChangelog).collect(Collectors.toSet()))
 );
-final List taskIds = new 
ArrayList<>(topicPartitionsByTaskId.keySet());
+final List taskIds = new ArrayList<>(standbyTasksToOptimize);

Review Comment:
   If the only point of the `standbyTasksToOptimize` set is to create this 
list, we can just do that directly with `.flatMap` (though I think we should 
keep the name, ie rename `taskIds` to `standbyTasksToOptimize`):
   
   ```
   final List taskIds = kafkaStreamsAssignments.values().stream()
   .flatMap(r -> r.tasks().values().stream())
   .filter(task -> task.type() == AssignedTask.Type.STANDBY)
   .map(AssignedTask::id)
   .distinct().collect(Collectors.toList());
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -92,7 +164,7 @@ public static Map 
identityAssignment(final Ap
  */
 public static Map 
defaultStandbyTaskAssignment(final ApplicationState applicationState,

   final Map kafkaStreamsAssignments) {
-if 
(applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {

Review Comment:
   LOL great bug 😂 



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -238,24 +306,37 @@ public static Map 
optimizeRackAwareStandbyTas
 final int crossRackTrafficCost = 
applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt();
 final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt();
 
+final Set standbyTasksToOptimize = new HashSet<>();
+kafkaStreamsAssignments.values().forEach(assignment -> {
+final Set standbyTasksForAssignment = 
assignment.tasks().values().stream()
+.filter(task -> task.type() == AssignedTask.Type.STANDBY)
+.map(AssignedTask::id)
+.collect(Collectors.toSet());
+standbyTasksToOptimize.addAll(standbyTasksForAssignment);
+});
+
 final Map> topicPartitionsByTaskId =
 
applicationState.allTasks().values().stream().collect(Collectors.toMap(
 TaskInfo::id,
 t -> 
t.topicPartitions().stream().filter(TaskTopicPartition::isChangelog).collect(Collectors.toSet()))
 );
-final List taskIds = new 
ArrayList<>(topicPartitionsByTaskId.keySet());
+final List taskIds = new ArrayList<>(standbyTasksToOptimize);
 
 final Map kafkaStreamsStates = 
applicationState.kafkaStreamsStates(false);
 
 final Map> clientRacks = new HashMap<>();
 final List clientIds = new ArrayList<>();
 final Map assignmentsByUuid = new 
HashMap<>();
 
-for (final Map.Entry entry : 
kafkaStreamsAssignments.entrySet()) {
-final UUID uuid = entry.getKey().id();
-clientIds.add(uuid);
-clientRacks.put(uuid, 
kafkaStreamsStates.get(entry.getKey()).rackId());
-assignmentsByUuid.put(uuid, entry.getValue());
+for (final Map.Entry entry : 
kafkaStreamsStates.entrySet()) {
+final ProcessId processId = entry.getKey();
+clientIds.add(processId.id());
+clientRacks.put(processId.id(), entry.getValue().rackId());
+if (!kafkaStreamsAssignments.containsKey(processId)) {

Review Comment:
   We have this exact same loop in the active task optimization method -- I 
assume we should make this same change up there?



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:

[jira] [Created] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-06 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16903:
-

 Summary: Task should consider producer error previously occurred 
for different task
 Key: KAFKA-16903
 URL: https://issues.apache.org/jira/browse/KAFKA-16903
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Bruno Cadonna
Assignee: Bruno Cadonna


A task does not consider a producer error that occurred for a different task.

The following log messages show the issue.

Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
with an {{InvalidTxnStateException}}:

{code:java}
[2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
sending record to topic stream-soak-test-node-name-repartition for task 0_2 due 
to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent. 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.

[2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
task 0_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
at 
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state.
{code} 

Just before the exception of task 0_2  also task 0_0  encountered an exception 
while producing:

{code:java}
[2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered 
sending record to topic stream-soak-test-network-id-repartition for task 0_0 
due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidProducerE

Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628904523


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {
+if (this.isCancelled()) return;
+withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+ 

Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628910661


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {
+if (this.isCancelled()) return;
+withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+ 

Re: [PR] KAFKA-15045: (KIP-924 pt. 19) Update to new AssignmentConfigs [kafka]

2024-06-06 Thread via GitHub


ableegoldman commented on code in PR #16219:
URL: https://github.com/apache/kafka/pull/16219#discussion_r1628909884


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java:
##
@@ -21,7 +21,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;

Review Comment:
   oops -- glad you picked up this refactor, this would have been embarrassing 
to ship 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 19) Update to new AssignmentConfigs [kafka]

2024-06-06 Thread via GitHub


ableegoldman commented on PR #16219:
URL: https://github.com/apache/kafka/pull/16219#issuecomment-2151588696

   @apourchet test failures are related 
   
   Looks like it's mad about some OptionalInt thing), for example:
   
   ```
   java.lang.NullPointerException: Cannot invoke 
"java.util.OptionalInt.orElse(int)" because the return value of 
"org.apache.kafka.streams.processor.assignment.AssignmentConfigs.rackAwareTrafficCost()"
 is null
at 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor.optimizeActive(StickyTaskAssignor.java:105)
at 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor.assign(StickyTaskAssignor.java:87)
at 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest.assign(StickyTaskAssignorTest.java:1088)
at 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest.assign(StickyTaskAssignorTest.java:1082)
at 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest.assign(StickyTaskAssignorTest.java:1065)
at 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest.shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients(StickyTaskAssignorTest.java:735)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628912516


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);

Review Comment:
   Our goal is to write large batches so I thought that starting with a large 
buffer and avoiding the grow it would make sense here. I don't really see a 
downside to it as we reuse the buffer too. I can revert this change if you feel 
like that it is better to keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628914978


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {

Review Comment:
   We do cancel it in `freeCurrentBatch`. As Jeff said, the timers schedules an 
internal operation so the flush cannot be executed concurrently by multiple 
thr

Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628915608


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {
+if (this.isCancelled()) return;
+withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+ 

Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628920984


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -626,89 +833,113 @@ private void append(
 // If the records are empty, it was a read operation after 
all. In this case,
 // the response can be returned directly iff there are no 
pending write operations;
 // otherwise, the read needs to wait on the last write 
operation to be completed.
-OptionalLong pendingOffset = 
deferredEventQueue.highestPendingOffset();
-if (pendingOffset.isPresent()) {
-deferredEventQueue.add(pendingOffset.getAsLong(), event);
+if (currentBatch != null) {
+currentBatch.events.add(event);
 } else {
-event.complete(null);
+OptionalLong pendingOffset = 
deferredEventQueue.highestPendingOffset();
+if (pendingOffset.isPresent()) {
+deferredEventQueue.add(pendingOffset.getAsLong(), 
event);
+} else {
+event.complete(null);
+}
 }
 } else {
 // If the records are not empty, first, they are applied to 
the state machine,
-// second, then are written to the partition/log, and finally, 
the response
-// is put into the deferred event queue.
-long prevLastWrittenOffset = coordinator.lastWrittenOffset();
-LogConfig logConfig = partitionWriter.config(tp);
-byte magic = logConfig.recordVersion().value;
-int maxBatchSize = logConfig.maxMessageSize();
+// second, then are appended to the opened batch.
 long currentTimeMs = time.milliseconds();
-ByteBuffer buffer = 
bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize));
 
-try {
-MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-buffer,
-magic,
-compression,
-TimestampType.CREATE_TIME,
-0L,
+// If the current write operation is transactional, the 
current batch
+// is written before proceeding with it.

Review Comment:
   Transactional records must be in a single batch because the transactional 
metadata such as the producer id and epoch are stored at the batch level. In 
other words, it is not possible to batch transactional records.
   
   The goal of this code is not the flush the batch created by the write 
operation if transactional but to flush the previous batch if any. Imagine that 
you have a regular write, a batch is created with the records but it is not 
flushed not. Then, you have a transactional write. In this case, the first 
batch must be flushed and the the transactional one must be flushed too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628923696


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -592,14 +674,139 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
 
 /**
- * Appends records the the log and replay them to the state machine.
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {

Review Comment:
   Yeah, this is also try prior to this patch. We could also log something here 
directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16786: Remove old assignment strategy usage in new consumer [kafka]

2024-06-06 Thread via GitHub


lucasbru merged PR #16214:
URL: https://github.com/apache/kafka/pull/16214


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16884 : Refactor RemoteLogManagerConfig with AbstractConfig [kafka]

2024-06-06 Thread via GitHub


muralibasani commented on code in PR #16199:
URL: https://github.com/apache/kafka/pull/16199#discussion_r1628950242


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java:
##
@@ -16,184 +16,105 @@
  */
 package org.apache.kafka.server.log.remote.storage;
 
-import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigException;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class RemoteLogManagerConfigTest {
 
-private static class TestConfig extends AbstractConfig {
-public TestConfig(Map originals) {
-super(RemoteLogManagerConfig.CONFIG_DEF, originals, true);
-}
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testValidConfigs(boolean 
useDefaultRemoteLogMetadataManagerClass) {
+@Test
+public void testValidConfigs() {
 String rsmPrefix = "__custom.rsm.";
 String rlmmPrefix = "__custom.rlmm.";
 Map rsmProps = Collections.singletonMap("rsm.prop", 
"val");
 Map rlmmProps = Collections.singletonMap("rlmm.prop", 
"val");
-RemoteLogManagerConfig expectedRemoteLogManagerConfig = 
getRemoteLogManagerConfig(useDefaultRemoteLogMetadataManagerClass,
-rsmPrefix,
-rlmmPrefix,
-rsmProps,
-rlmmProps);
 
-Map props = 
extractProps(expectedRemoteLogManagerConfig);
+Map props = getRLMProps(rsmPrefix, rlmmPrefix);
 rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
 rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v));
+
+RemoteLogManagerConfig expectedRemoteLogManagerConfig = new 
RemoteLogManagerConfig(props);
+
 // Removing remote.log.metadata.manager.class.name so that the default 
value gets picked up.
-if (useDefaultRemoteLogMetadataManagerClass) {
-props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
-}
-TestConfig config = new TestConfig(props);
-RemoteLogManagerConfig remoteLogManagerConfig = new 
RemoteLogManagerConfig(config);
-assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig);
+props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
+
+RemoteLogManagerConfig remoteLogManagerConfig = new 
RemoteLogManagerConfig(props);
+assertEquals(expectedRemoteLogManagerConfig.values(), 
remoteLogManagerConfig.values());
+
+assertEquals(rsmProps, 
remoteLogManagerConfig.remoteStorageManagerProps());
+assertEquals(rlmmProps, 
remoteLogManagerConfig.remoteLogMetadataManagerProps());
 }
 
-private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean 
useDefaultRemoteLogMetadataManagerClass,
-String 
rsmPrefix,
-String 
rlmmPrefix,
-
Map rsmProps,
-
Map rlmmProps) {
-String remoteLogMetadataManagerClass = 
useDefaultRemoteLogMetadataManagerClass ? 
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : 
"dummy.remote.log.metadata.class";
-return new RemoteLogManagerConfig(true,
-"dummy.remote.storage.class",
-"dummy.remote.storage.class.path",
-remoteLogMetadataManagerClass,
-"dummy.remote.log.metadata.class.path",
-"listener.name",
-1024 * 1024L,
-1,
-1,
-1,
-6L,
-  

Re: [PR] KAFKA-16884 : Refactor RemoteLogManagerConfig with AbstractConfig [kafka]

2024-06-06 Thread via GitHub


muralibasani commented on code in PR #16199:
URL: https://github.com/apache/kafka/pull/16199#discussion_r1628950242


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java:
##
@@ -16,184 +16,105 @@
  */
 package org.apache.kafka.server.log.remote.storage;
 
-import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigException;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class RemoteLogManagerConfigTest {
 
-private static class TestConfig extends AbstractConfig {
-public TestConfig(Map originals) {
-super(RemoteLogManagerConfig.CONFIG_DEF, originals, true);
-}
-}
-
-@ParameterizedTest
-@ValueSource(booleans = {true, false})
-public void testValidConfigs(boolean 
useDefaultRemoteLogMetadataManagerClass) {
+@Test
+public void testValidConfigs() {
 String rsmPrefix = "__custom.rsm.";
 String rlmmPrefix = "__custom.rlmm.";
 Map rsmProps = Collections.singletonMap("rsm.prop", 
"val");
 Map rlmmProps = Collections.singletonMap("rlmm.prop", 
"val");
-RemoteLogManagerConfig expectedRemoteLogManagerConfig = 
getRemoteLogManagerConfig(useDefaultRemoteLogMetadataManagerClass,
-rsmPrefix,
-rlmmPrefix,
-rsmProps,
-rlmmProps);
 
-Map props = 
extractProps(expectedRemoteLogManagerConfig);
+Map props = getRLMProps(rsmPrefix, rlmmPrefix);
 rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
 rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v));
+
+RemoteLogManagerConfig expectedRemoteLogManagerConfig = new 
RemoteLogManagerConfig(props);
+
 // Removing remote.log.metadata.manager.class.name so that the default 
value gets picked up.
-if (useDefaultRemoteLogMetadataManagerClass) {
-props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
-}
-TestConfig config = new TestConfig(props);
-RemoteLogManagerConfig remoteLogManagerConfig = new 
RemoteLogManagerConfig(config);
-assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig);
+props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
+
+RemoteLogManagerConfig remoteLogManagerConfig = new 
RemoteLogManagerConfig(props);
+assertEquals(expectedRemoteLogManagerConfig.values(), 
remoteLogManagerConfig.values());
+
+assertEquals(rsmProps, 
remoteLogManagerConfig.remoteStorageManagerProps());
+assertEquals(rlmmProps, 
remoteLogManagerConfig.remoteLogMetadataManagerProps());
 }
 
-private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean 
useDefaultRemoteLogMetadataManagerClass,
-String 
rsmPrefix,
-String 
rlmmPrefix,
-
Map rsmProps,
-
Map rlmmProps) {
-String remoteLogMetadataManagerClass = 
useDefaultRemoteLogMetadataManagerClass ? 
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : 
"dummy.remote.log.metadata.class";
-return new RemoteLogManagerConfig(true,
-"dummy.remote.storage.class",
-"dummy.remote.storage.class.path",
-remoteLogMetadataManagerClass,
-"dummy.remote.log.metadata.class.path",
-"listener.name",
-1024 * 1024L,
-1,
-1,
-1,
-6L,
-  

Re: [PR] KAFKA-16786: Remove old assignment strategy usage in new consumer [kafka]

2024-06-06 Thread via GitHub


lucasbru commented on PR #16214:
URL: https://github.com/apache/kafka/pull/16214#issuecomment-2151637205

   @lianetm do we need this cherry-picked on 3.8?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1628997309


##
settings.gradle:
##
@@ -61,6 +61,7 @@ include 'clients',
 'examples',
 'generator',
 'group-coordinator',
+'group-coordinator:group-coordinator-api',

Review Comment:
   We named other internal API modules just `api`, for example `connect:api`, 
`storage:api`. Should we stick to the same convention?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1628999147


##
settings.gradle:
##
@@ -61,6 +61,7 @@ include 'clients',
 'examples',
 'generator',
 'group-coordinator',
+'group-coordinator:group-coordinator-api',

Review Comment:
   I just noticed we have `tools:tools-api` too 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1628999147


##
settings.gradle:
##
@@ -61,6 +61,7 @@ include 'clients',
 'examples',
 'generator',
 'group-coordinator',
+'group-coordinator:group-coordinator-api',

Review Comment:
   I just noticed we have `tools:tools-api` too :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16903: Consider produce error of different task [kafka]

2024-06-06 Thread via GitHub


cadonna opened a new pull request, #16222:
URL: https://github.com/apache/kafka/pull/16222

   A task does not know anything about a produce error thrown by a different 
task. That might lead to a InvalidTxnStateException when a task attempts to do 
a transactional operation on a producer that failed due to a different task.
   
   This commit stores the produce exception in the streams producer on 
completion of a send instead of the record collector since the record collector 
is on task level whereas the stream producer is on stream thread level. Since 
all tasks use the same streams producer the error should be correctly 
propagated across tasks of the same stream thread.
   
   For EOS alpha, this commit does not change anything because each task uses 
its own producer. The send error is still on task level but so is also the 
transaction.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1629002408


##
settings.gradle:
##
@@ -61,6 +61,7 @@ include 'clients',
 'examples',
 'generator',
 'group-coordinator',
+'group-coordinator:group-coordinator-api',

Review Comment:
   Reusing `api` does not work. Gradle gets confused about it and dependencies 
fail. There is a bug in gradle. This is why we actually use `tools:tools-api` 
and why we have `project(":storage:api").name = "storage-api"` later in this 
file. So we effectively, use `module:module-api` as names expect for connect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1629004232


##
settings.gradle:
##
@@ -61,6 +61,7 @@ include 'clients',
 'examples',
 'generator',
 'group-coordinator',
+'group-coordinator:group-coordinator-api',

Review Comment:
   For reference: https://github.com/gradle/gradle/issues/847



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16893: ConnectorUtils can group elements in round-robin fashion [kafka]

2024-06-06 Thread via GitHub


fanyang commented on PR #16208:
URL: https://github.com/apache/kafka/pull/16208#issuecomment-2151697221

   Sure, I will start a KIP for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16893) ConnectorUtils can group elements in round-robin fashion

2024-06-06 Thread Fan Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852680#comment-17852680
 ] 

Fan Yang commented on KAFKA-16893:
--

Will start a KIP for it.

> ConnectorUtils can group elements in round-robin fashion
> 
>
> Key: KAFKA-16893
> URL: https://issues.apache.org/jira/browse/KAFKA-16893
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Fan Yang
>Assignee: Fan Yang
>Priority: Minor
>  Labels: needs-kip
>
> Current ConnectorUtils can only assign continuous elements in groups. But in 
> some scenario, continuous elements need to be distributed in different groups.
> So, we can add round-robin assignment strategy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] improve log description of QuorumController [kafka]

2024-06-06 Thread via GitHub


chickenchickenlove commented on code in PR #15926:
URL: https://github.com/apache/kafka/pull/15926#discussion_r1629025724


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -165,6 +165,8 @@ static ControllerResult recordsForNonEmptyLog(
 throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was " +
 "created in KRaft mode.");
 }
+logMessageBuilder
+.append("since this is a de-novo KRaft cluster.");

Review Comment:
   @mumrah gently ping.
   When you have free time, please take another look? 
   Thanks in advance! 🙇‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16903: Consider produce error of different task [kafka]

2024-06-06 Thread via GitHub


cadonna commented on code in PR #16222:
URL: https://github.com/apache/kafka/pull/16222#discussion_r1629028760


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##
@@ -88,6 +88,7 @@ public RecordCollectorImpl(final LogContext logContext,
 this.log = logContext.logger(getClass());
 this.taskId = taskId;
 this.streamsProducer = streamsProducer;
+this.sendException = streamsProducer.sendException();

Review Comment:
   That is basically the fix.
   
   Notice that now an exception caused by one task can be thrown by a different 
task. For example:
   
   ```java
   2024-05-30 10:20:35,916] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Error flushing caches of 
dirty task 0_0 (org.apache.kafka.streams.processor.internals.TaskManager)
   org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
sending record to topic stream-soak-test-network-id-repartition for task 1_1 
due to:
   org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
   Written offsets would not be recorded and no more records would be sent 
since the producer is fenced, indicating the task may be migrated out; it means 
all tasks belonging to this thread should be migrated.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:305)
   ```
   Task `0_0` throws error caused by `1_1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16903: Consider produce error of different task [kafka]

2024-06-06 Thread via GitHub


cadonna commented on code in PR #16222:
URL: https://github.com/apache/kafka/pull/16222#discussion_r1629032640


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -754,7 +755,7 @@ public void shouldForwardFlushToStreamsProducer() {
 final StreamsProducer streamsProducer = mock(StreamsProducer.class);
 when(streamsProducer.eosEnabled()).thenReturn(false);
 doNothing().when(streamsProducer).flush();
-
+when(streamsProducer.sendException()).thenReturn(new 
AtomicReference<>(null));

Review Comment:
   Needed to add this stub to a couple of tests so that they do not throw a 
`NullPointerException` because the mock return `null` for `sendException()`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Minor, Update NetworkClient.java [kafka]

2024-06-06 Thread via GitHub


nicolasguyomar opened a new pull request, #16223:
URL: https://github.com/apache/kafka/pull/16223

   Removing reference to AK 1.0.0 to remove any potential confusion as such old 
version would unlikely be in use anymore
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16884 : Refactor RemoteLogManagerConfig with AbstractConfig [kafka]

2024-06-06 Thread via GitHub


muralibasani commented on PR #16199:
URL: https://github.com/apache/kafka/pull/16199#issuecomment-2151786106

   @kamalcph @gharris1727 @chia7712 thanks for all your great reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-16856) Zookeeper - Add new exception

2024-06-06 Thread Muralidhar Basani (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16856 ]


Muralidhar Basani deleted comment on KAFKA-16856:
---

was (Author: muralibasani):
[~christo_lolov] possible to take a look at this one too ?

> Zookeeper - Add new exception
> -
>
> Key: KAFKA-16856
> URL: https://issues.apache.org/jira/browse/KAFKA-16856
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> *Summary*
> Add TIERED_STORAGE_DISABLEMENT_IN_PROGRESS exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-16856) Zookeeper - Add new exception

2024-06-06 Thread Muralidhar Basani (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16856 ]


Muralidhar Basani deleted comment on KAFKA-16856:
---

was (Author: muralibasani):
[~christo_lolov] I tried to create a PR for it. Can you pls take a look?

> Zookeeper - Add new exception
> -
>
> Key: KAFKA-16856
> URL: https://issues.apache.org/jira/browse/KAFKA-16856
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> *Summary*
> Add TIERED_STORAGE_DISABLEMENT_IN_PROGRESS exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16856) Zookeeper - Add new exception

2024-06-06 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852693#comment-17852693
 ] 

Muralidhar Basani commented on KAFKA-16856:
---

[~christo_lolov] would you like to take a look at this pr ?

> Zookeeper - Add new exception
> -
>
> Key: KAFKA-16856
> URL: https://issues.apache.org/jira/browse/KAFKA-16856
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> *Summary*
> Add TIERED_STORAGE_DISABLEMENT_IN_PROGRESS exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-06 Thread via GitHub


abhijeetk88 commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2151797298

   Hi @showuon. I have responded to your comment 
[here](https://github.com/apache/kafka/pull/15820#discussion_r1625465435). 
Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add demo template for transactional client [kafka]

2024-06-06 Thread via GitHub


k-raina commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1629100925


##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,153 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+/**
+ * This class demonstrates a transactional Kafka client application that 
consumes messages from an input topic,
+ * processes them to generate word count statistics, and produces the results 
to an output topic.
+ * It utilizes Kafka's transactional capabilities to ensure exactly-once 
processing semantics.
+ *
+ * The application continuously polls for records from the input topic, 
processes them, and commits the offsets
+ * in a transactional manner. In case of exceptions or errors, it handles them 
appropriately, either aborting the
+ * transaction and resetting to the last committed positions, or restarting 
the application.
+ *
+ */
+public class TransactionalClientDemo {
+
+private static final String CONSUMER_GROUP_ID = "my-group-id";
+private static final String OUTPUT_TOPIC = "output";
+private static final String INPUT_TOPIC = "input";
+private static KafkaConsumer consumer;
+private static KafkaProducer producer;
+
+public static void main(String[] args) {
+initializeApplication();
+
+boolean isRunning = true;
+// Continuously poll for records
+while(isRunning) {

Review Comment:
   Updated in commit: 85eb274658b633f3b5de300fd9b2594fe39a3a70



##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,153 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;

Review Comment:
   Updated in commit: 85eb274658b633f3b5de300fd9b2594fe39a3a70



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16904) Metric to measure the latency of remote read requests

2024-06-06 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16904:


 Summary: Metric to measure the latency of remote read requests
 Key: KAFKA-16904
 URL: https://issues.apache.org/jira/browse/KAFKA-16904
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


[KIP-1018|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests]

Emit a new metric to measure the amount of time taken to read from the remote 
storage:

{{kafka.log.remote:type=RemoteLogManager,name=RemoteLogReaderFetchRateAndTimeMs}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16904: Metric to measure the latency of remote read requests [kafka]

2024-06-06 Thread via GitHub


kamalcph commented on PR #16209:
URL: https://github.com/apache/kafka/pull/16209#issuecomment-2151827027

   @satishd @showuon 
   
   Addressed your review comments. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629131122


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -174,4 +194,22 @@ public String toString() {
 ')';
 }
 }
+
+public static class Node {
+private int nodeId;
+private List endpoints;

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629131633


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -94,21 +104,24 @@ public String toString() {
 
 public static class ReplicaState {
 private final int replicaId;
+private final Optional replicaDirectoryId;
 private final long logEndOffset;
 private final OptionalLong lastFetchTimestamp;
 private final OptionalLong lastCaughtUpTimestamp;
 
 ReplicaState() {
-this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+this(0, Optional.empty(), 0, OptionalLong.empty(), 
OptionalLong.empty());
 }
 
 ReplicaState(
 int replicaId,
+Optional replicaDirectoryId,

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629135689


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -64,6 +70,10 @@ public List observers() {
 return observers;
 }
 
+public List nodes() {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629135971


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -29,19 +32,22 @@ public class QuorumInfo {
 private final long highWatermark;
 private final List voters;
 private final List observers;
+private final List nodes;
 
 QuorumInfo(
 int leaderId,
 long leaderEpoch,
 long highWatermark,
 List voters,
-List observers
+List observers,
+List nodes

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically [kafka]

2024-06-06 Thread via GitHub


kamalcph commented on code in PR #16203:
URL: https://github.com/apache/kafka/pull/16203#discussion_r1629151230


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -792,6 +792,36 @@ class DynamicBrokerConfigTest {
 verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
   }
 
+  @Test
+  def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+val config = KafkaConfig(props)
+val kafkaBroker = mock(classOf[KafkaBroker])
+when(kafkaBroker.config).thenReturn(config)
+
+val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+config.dynamicConfig.initialize(None, None)
+config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+val newProps = new Properties()
+newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "3")
+// update default config
+config.dynamicConfig.validate(newProps, perBrokerConfig = false)

Review Comment:
   the test will fail if the validation fails



##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -792,6 +792,36 @@ class DynamicBrokerConfigTest {
 verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
   }
 
+  @Test
+  def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+val config = KafkaConfig(props)
+val kafkaBroker = mock(classOf[KafkaBroker])
+when(kafkaBroker.config).thenReturn(config)
+
+val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+config.dynamicConfig.initialize(None, None)
+config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+val newProps = new Properties()
+newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "3")
+// update default config
+config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+config.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(3, config.remoteFetchMaxWaitMs)
+
+// update per broker config
+newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "1")
+config.dynamicConfig.validate(newProps, perBrokerConfig = true)

Review Comment:
   the test will fail if the validation fails



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16523) kafka-metadata-quorum add voter and remove voter changes

2024-06-06 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming reassigned KAFKA-16523:
---

Assignee: Deng Ziming

> kafka-metadata-quorum add voter and remove voter changes
> 
>
> Key: KAFKA-16523
> URL: https://issues.apache.org/jira/browse/KAFKA-16523
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Reporter: José Armando García Sancio
>Assignee: Deng Ziming
>Priority: Major
>
> # 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-add-controller–config%3Cserver.properties%3E
>  # 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-remove-controller--controller-id%3Ccontroller-id%3E--controller-uuid%3Ccontroller-uuid%3E



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


chia7712 commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1629176839


##
settings.gradle:
##
@@ -61,6 +61,7 @@ include 'clients',
 'examples',
 'generator',
 'group-coordinator',
+'group-coordinator:group-coordinator-api',

Review Comment:
   @mimaison you raise a good discussion. I had the same question in creating 
tools-api module 😄 
   
   As new module, maybe storage:api can be rename to storage:storage-api for 
consistency. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16904: Metric to measure the latency of remote read requests [kafka]

2024-06-06 Thread via GitHub


showuon commented on code in PR #16209:
URL: https://github.com/apache/kafka/pull/16209#discussion_r1629199724


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4167,6 +4170,8 @@ class ReplicaManagerTest {
   assertEquals(3, 
yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
   // unlock all tasks
   doneLatch.countDown()
+  responseLatch.await()

Review Comment:
   nit: Please add a time limit to this await. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fixed potential thread block by admin API [kafka]

2024-06-06 Thread via GitHub


omkreddy commented on PR #16217:
URL: https://github.com/apache/kafka/pull/16217#issuecomment-2151905944

   there is checkstyle error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-06 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2151916716

   If you have a moment, Please take a look. @lianetm , @philipnee , @kirktrue 
   
   I've applied a simple approach to the ConsumerNetworkThread level.
   I pushed it a few minutes ago, so if you've seen the code before, you should 
see it anew.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #16193:
URL: https://github.com/apache/kafka/pull/16193#discussion_r1629232827


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -1335,10 +1335,6 @@ public synchronized void 
updateConsumerGroupOffsets(final Map

Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #16193:
URL: https://github.com/apache/kafka/pull/16193#discussion_r1629232827


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -1335,10 +1335,6 @@ public synchronized void 
updateConsumerGroupOffsets(final Map

Re: [PR] Fixed potential thread block by admin API [kafka]

2024-06-06 Thread via GitHub


apoorvmittal10 commented on PR #16217:
URL: https://github.com/apache/kafka/pull/16217#issuecomment-2151935532

   > there is checkstyle error
   
   My bad, fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16257) SchemaProjector should be extensible to logical types

2024-06-06 Thread Fan Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852715#comment-17852715
 ] 

Fan Yang commented on KAFKA-16257:
--

Hi [~gharris1727] , I understand your concerns. I'll create a draft PR and test 
the different class loader scenarios.

> SchemaProjector should be extensible to logical types
> -
>
> Key: KAFKA-16257
> URL: https://issues.apache.org/jira/browse/KAFKA-16257
> Project: Kafka
>  Issue Type: New Feature
>  Components: connect
>Reporter: Greg Harris
>Assignee: Fan Yang
>Priority: Minor
>  Labels: needs-kip
>
> The SchemaProjector currently only supports projecting primitive Number 
> types, and cannot handle common logical types as have proliferated in the 
> Connect ecosystem.
> The SchemaProjector or a replacement should have the ability to extend it's 
> functionality to support these logical types.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Adding 3.7.0 docker official images static assets (#1) [kafka]

2024-06-06 Thread via GitHub


KrishVora01 opened a new pull request, #16224:
URL: https://github.com/apache/kafka/pull/16224

   This PR aims to add the static Dockerfile and scripts for AK 3.7.0 version. 
As mentioned in 
[KIP-1028's](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1028%3A+Docker+Official+Image+for+Apache+Kafka#KIP1028:DockerOfficialImageforApacheKafka-ProposedWorkflowforReleaseProcess)
 discussion thread 
[here](https://lists.apache.org/thread/pfk2hqssbkt5ffblnh0gs7fw4w8ldcz6), since 
this KIP was voted, this PR aims to start the release of the kafka:3.7.0 Docker 
Official image. This will also help us validate the process and allow us to 
address any changes suggested by Dockerhub before the 3.8.0 release.
   
   The static Dockerfile and scripts have been generated via the github actions 
workflows and scripts added as part of 
[PR-16027](https://github.com/apache/kafka/pull/16027). The reports of build 
and testing the 3.7.0 Docker official image are below.
   
[report_jvm.html.zip](https://github.com/user-attachments/files/15613529/report_jvm.html.zip)
   
[scan_report_jvm.txt.zip](https://github.com/user-attachments/files/15613530/scan_report_jvm.txt.zip)
   
   **Post this PR:**
   1. Run the 
[docker/generate_kafka_pr_template.py](https://github.com/apache/kafka/blob/trunk/docker/generate_kafka_pr_template.py)
 script, and raise the corresponding PR in [Docker Official Images 
repo](https://github.com/docker-library/official-images/tree/master/library).
   
   **Committer Checklist (excluded from commit message)**
   - [ ]  Verify design and implementation
   - [ ]  Verify test coverage and CI build status


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: migrate SaslClientsWithInvalidCredentialsTest to use ClusterTestExtensions [kafka]

2024-06-06 Thread via GitHub


FrankYang0529 opened a new pull request, #16225:
URL: https://github.com/apache/kafka/pull/16225

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16373: KIP-1028: Modifying Readme for Docker official images [kafka]

2024-06-06 Thread via GitHub


KrishVora01 opened a new pull request, #16226:
URL: https://github.com/apache/kafka/pull/16226

   This PR aims to modify the 
[README](https://github.com/apache/kafka/blob/trunk/docker/README.md) file 
under `/docker`, to include the steps to release the Docker Official Images.
   
   
   Post this PR:
   1. Update post-release process on the AK website
   
   **Committer Checklist (excluded from commit message)**
   - [ ]  Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


chia7712 commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1629310670


##
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberAssignment.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.api.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The partition assignment for a consumer group member.
+ */
+@InterfaceStability.Unstable
+public interface MemberAssignment {

Review Comment:
   oh, i neglected this PR aims to complete the migration, but I do love this 
change 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629355285


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -174,4 +194,22 @@ public String toString() {
 ')';
 }
 }
+
+public static class Node {
+private int nodeId;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629358003


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -174,4 +194,22 @@ public String toString() {
 ')';
 }
 }
+
+public static class Node {
+private int nodeId;

Review Comment:
   > Do they get an empty list of endpoints?
   
   Yes.
   
   > That seems simple, but it needs to be spelled out.
   
   Do we need to write it down somewhere?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Add ProcessingExceptionHandler interface and implementations [kafka]

2024-06-06 Thread via GitHub


cadonna merged PR #16187:
URL: https://github.com/apache/kafka/pull/16187


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629371126


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4424,12 +4427,21 @@ private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.Partition
 .map(this::translateReplicaState)
 .collect(Collectors.toList());
 
+List nodes = nodeCollection.stream().map(n -> 
{
+List endpoints = n.listeners().stream()
+.map(l -> new Endpoint(l.name(), 
SecurityProtocol.forId(l.securityProtocol()), l.host(), l.port()))

Review Comment:
   Replaced with the `RaftVoterEndpoint`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


showuon commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629419251


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -73,12 +83,13 @@ public boolean equals(Object o) {
 && leaderEpoch == that.leaderEpoch
 && highWatermark == that.highWatermark
 && Objects.equals(voters, that.voters)
-&& Objects.equals(observers, that.observers);
+&& Objects.equals(observers, that.observers)
+&& Objects.equals(nodes, that.nodes);

Review Comment:
   Will the equals work as expected if we don't override equals method?



##
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##
@@ -69,6 +70,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
   val partitionData = topicData.partitions.get(0)
   assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
   assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+  assertEquals("", partitionData.errorMessage())

Review Comment:
   ditto



##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -89,26 +100,30 @@ public String toString() {
 ", highWatermark=" + highWatermark +
 ", voters=" + voters +
 ", observers=" + observers +
+", nodes=" + nodes +

Review Comment:
   Node object cannot be correct printed I think. No? 



##
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##
@@ -60,6 +60,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
   val response = connectAndReceive[DescribeQuorumResponse](request)
 
   assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+  assertEquals("", response.data.errorMessage)

Review Comment:
   In `DescribeQuorumResponse.json`, we said `null if no error` for the error 
message, but here it gets empty string. Where do we do that conversion?



##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -29,19 +32,22 @@ public class QuorumInfo {
 private final long highWatermark;
 private final List voters;
 private final List observers;
+private final Map nodes;

Review Comment:
   Could you explain why we need a map here , not a Set ? It looks like 
we already store nodeId in Node object, and we don't need the nodeId as key, 
right?



##
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##
@@ -44,25 +48,26 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LeaderStateTest {
-private final int localId = 0;
-private final int epoch = 5;
+private final Entry localId = entry(0);
+private final Entry epoch = entry(5);

Review Comment:
   1. It's not common to create an entry object
   2. I don't think directryId has any effect in most of these test cases, so 
could we use a dummy one to represent it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil

2024-06-06 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852761#comment-17852761
 ] 

Ksolves commented on KAFKA-16803:
-

I've noticed that the ShadowJavaPlugin is not functioning as expected in 
version 8.1.1 due to a bug related to incorrect publication file references 
when using Gradle signing and publication 
[#901|https://github.com/johnrengelman/shadow/issues/901]. Fortunately, there 
is a solution available. 

johnrengelman has addressed this issue in a forked version of the plugin - 
[io.github.goooler.shadow|https://plugins.gradle.org/plugin/io.github.goooler.shadow]
We can fix the issue using this plugin (Latest version is 8.1.7). Let me know 
your thoughts.

> Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
> 
>
> Key: KAFKA-16803
> URL: https://issues.apache.org/jira/browse/KAFKA-16803
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> The org.gradle.util.ConfigureUtil type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations]
> 2 usages    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]

2024-06-06 Thread via GitHub


johnnychhsu commented on code in PR #15966:
URL: https://github.com/apache/kafka/pull/15966#discussion_r1629471445


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -99,28 +99,24 @@ public Errors error() {
 }
 
 public LinkedHashMap 
responseData(Map topicNames, short version) {
-if (responseData == null) {
-synchronized (this) {
-if (responseData == null) {
-// Assigning the lazy-initialized `responseData` in the 
last step
-// to avoid other threads accessing a half-initialized 
object.
-final LinkedHashMap responseDataTmp =
-new LinkedHashMap<>();
-data.responses().forEach(topicResponse -> {
-String name;
-if (version < 13) {
-name = topicResponse.topic();
-} else {
-name = topicNames.get(topicResponse.topicId());
-}
-if (name != null) {
-topicResponse.partitions().forEach(partition ->
-responseDataTmp.put(new TopicPartition(name, 
partition.partitionIndex()), partition));
-}
-});
-responseData = responseDataTmp;
+synchronized (this) {
+// Assigning the lazy-initialized `responseData` in the last step
+// to avoid other threads accessing a half-initialized object.
+final LinkedHashMap responseDataTmp =
+new LinkedHashMap<>();
+data.responses().forEach(topicResponse -> {
+String name;
+if (version < 13) {
+name = topicResponse.topic();
+} else {
+name = topicNames.get(topicResponse.topicId());
 }
-}
+if (name != null) {
+topicResponse.partitions().forEach(partition ->
+responseDataTmp.put(new TopicPartition(name, 
partition.partitionIndex()), partition));
+}
+});
+responseData = responseDataTmp;

Review Comment:
   thanks for the review!
   yes, the returned data should be calculated on the fly based on the input 
topic names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-06 Thread via GitHub


frankvicky opened a new pull request, #16227:
URL: https://github.com/apache/kafka/pull/16227

   Add unit tests for `ConsumerRecords#records(String)`.
   
   This commit adds new unit tests to the ConsumerRecordsTest class. One of the 
tests validates that an IllegalArgumentException occurs when a null topic is 
passed, and another test validates that the `records(String)` method is 
behaving as expected when valid topics are passed. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2152367583

   @philipnee Thanks for respond
   
   1. No problem, I misunderstood.
   2. Implemented in 
`ConsumerNetworkThreadTest`(https://github.com/apache/kafka/pull/16043)
   
   In my thought, propagating metadata errors after `KafkaClient.poll()` should 
be in line with legacy consumers.
   And It seems like we can choose one of the items below.
   - Simple approach
  - https://github.com/apache/kafka/pull/15961#issuecomment-2135478481
  - https://github.com/apache/kafka/pull/15961#discussion_r1628247971
   - Wrapping metadata and a background queue in an object


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16786: Remove old assignment strategy usage in new consumer [kafka]

2024-06-06 Thread via GitHub


lianetm commented on PR #16214:
URL: https://github.com/apache/kafka/pull/16214#issuecomment-2152402100

   Yes please, let's cherry-pick it into 3.8. Thanks @lucasbru !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR]: Code Cleanup - Metadata module [kafka]

2024-06-06 Thread via GitHub


mimaison merged PR #16065:
URL: https://github.com/apache/kafka/pull/16065


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (KAFKA-16786) New consumer should not require the deprecated partition.assignment.strategy

2024-06-06 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16786.
--

> New consumer should not require the deprecated partition.assignment.strategy
> 
>
> Key: KAFKA-16786
> URL: https://issues.apache.org/jira/browse/KAFKA-16786
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The partition.assignment.strategy config is deprecated with the new consumer 
> group protocol KIP-848. With the new protocol, server side assignors are 
> supported for now, defined in the property
> group.remote.assignor, and with default values selected by the broker, so 
> it's not even a required property. 
> The new AsyncKafkaConsumer supports the new protocol only, but it currently 
> throws an IllegalStateException if a call to subscribe is made and the 
> deprecated config partition.assignment.strategy is empty (see 
> [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
>  
> We should remove the reference to ConsumerPartitionAssignor in the 
> AsyncKafkaConsumer, along with it's validation for non-empty on subscribe 
> (only use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16823: Extract LegacyConsumer-specific unit tests from generic KafkaConsumerTest [kafka]

2024-06-06 Thread via GitHub


FrankYang0529 opened a new pull request, #16228:
URL: https://github.com/apache/kafka/pull/16228

   - KafkaConsumerTest -> unit tests that apply to both consumers. 
   - LegacyKafkaConsumerTest -> unit tests that apply only to the 
LegacyKafkaConsumer, either because of the logic they test, or the way they are 
written (file to be created with this task)
   - AsyncKafkaConsumerTest -> unit tests that apply only to the 
AsyncKafkaConsumer (this file already exist)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fixed potential thread block by admin API [kafka]

2024-06-06 Thread via GitHub


mimaison commented on PR #16217:
URL: https://github.com/apache/kafka/pull/16217#issuecomment-2152502675

   Can you open a Jira so it's easier for users to find? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-06-06 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852793#comment-17852793
 ] 

Muralidhar Basani commented on KAFKA-16863:
---

Do we need a kip for this config cleanup ?

>From the description I understand we deprecate the below and create two new 
>configs without prefix 'default.'
- default.deserialization.exception.handler
- default.production.exception.handler

Is it ok, if I work on this ? If it needs a kip, I can create one.

> Consider removing `default.` prefix for exception handler config
> 
>
> Key: KAFKA-16863
> URL: https://issues.apache.org/jira/browse/KAFKA-16863
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Trivial
>  Labels: need-kip
>
> Kafka Streams has a set of configs with `default.` prefix. The intent for the 
> default-prefix is to make a distinction between, well the default, and 
> in-place overwrites in the code. Eg, users can specify ts-extractors on a 
> per-topic basis.
> However, for the deserialization- and production-exception handlers, no such 
> overwrites are possible, and thus, `default.` does not really make sense, 
> because there is just one handler overall. Via KIP-1033 we added a new 
> processing-exception handler w/o a default-prefix, too.
> Thus, we should consider to deprecate the two existing configs names and add 
> them back w/o the `default.` prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1629578263


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {

Review Comment:
   Yes, this part we're testing gets the requests from the managers and adds 
them to the network client (that's the transfer I was referring to). But 
whatever name we prefer, the point is to reflect what we're testing (which is 
not "managersPolledOnce")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16905) Thread block in describe topics API in Admin Client

2024-06-06 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16905:
-

 Summary: Thread block in describe topics API in Admin Client
 Key: KAFKA-16905
 URL: https://issues.apache.org/jira/browse/KAFKA-16905
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.9.0
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal
 Fix For: 3.9.0


The threads blocks while running admin client's descirbe topics API.

 
{code:java}
"kafka-admin-client-thread | adminclient-3" #114 daemon prio=5 os_prio=31 
cpu=6.57ms elapsed=417.17s tid=0x0001364fc200 nid=0x13403 waiting on 
condition  [0x0002bb419000]
   java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.7/Native Method)
- parking to wait for  <0x000773804828> (a 
java.util.concurrent.CompletableFuture$Signaller)
at 
java.util.concurrent.locks.LockSupport.park(java.base@17.0.7/LockSupport.java:211)
at 
java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.7/CompletableFuture.java:1864)
at 
java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.7/ForkJoinPool.java:3463)
at 
java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.7/ForkJoinPool.java:3434)
at 
java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.7/CompletableFuture.java:1898)
at 
java.util.concurrent.CompletableFuture.get(java.base@17.0.7/CompletableFuture.java:2072)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(KafkaAdminClient.java:2324)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.describeTopics(KafkaAdminClient.java:2122)
at org.apache.kafka.clients.admin.Admin.describeTopics(Admin.java:311)
at 
io.confluent.kafkarest.controllers.TopicManagerImpl.describeTopics(TopicManagerImpl.java:155)
at 
io.confluent.kafkarest.controllers.TopicManagerImpl.lambda$listTopics$2(TopicManagerImpl.java:76)
at 
io.confluent.kafkarest.controllers.TopicManagerImpl$$Lambda$1925/0x000800891448.apply(Unknown
 Source)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(java.base@17.0.7/CompletableFuture.java:1150)
at 
java.util.concurrent.CompletableFuture.postComplete(java.base@17.0.7/CompletableFuture.java:510)
at 
java.util.concurrent.CompletableFuture.complete(java.base@17.0.7/CompletableFuture.java:2147)
at 
io.confluent.kafkarest.common.KafkaFutures.lambda$toCompletableFuture$0(KafkaFutures.java:45)
at 
io.confluent.kafkarest.common.KafkaFutures$$Lambda$1909/0x000800897528.accept(Unknown
 Source)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$$Lambda$1910/0x000800897750.accept(Unknown
 Source)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@17.0.7/CompletableFuture.java:863)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@17.0.7/CompletableFuture.java:841)
at 
java.util.concurrent.CompletableFuture.postComplete(java.base@17.0.7/CompletableFuture.java:510)
at 
java.util.concurrent.CompletableFuture.complete(java.base@17.0.7/CompletableFuture.java:2147)
at 
org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$4.handleResponse(KafkaAdminClient.java:2106)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1370)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1523)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1446)
at java.lang.Thread.run(java.base@17.0.7/Thread.java:833) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16897) Move OffsetIndexTest and OffsetMapTest to storage module

2024-06-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16897:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> Move OffsetIndexTest and OffsetMapTest to storage module
> 
>
> Key: KAFKA-16897
> URL: https://issues.apache.org/jira/browse/KAFKA-16897
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16901) add unit tests for ConsumerRecords#records(String)

2024-06-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16901:
--

Assignee: TengYao Chi  (was: Chia-Ping Tsai)

> add unit tests for ConsumerRecords#records(String)
> --
>
> Key: KAFKA-16901
> URL: https://issues.apache.org/jira/browse/KAFKA-16901
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> ConsumerRecords#records(String) is a public API, so it is worthy to have unit 
> test :) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16898) move TimeIndexTest and TransactionIndexTest to storage module

2024-06-06 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16898:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> move TimeIndexTest and TransactionIndexTest to storage module
> -
>
> Key: KAFKA-16898
> URL: https://issues.apache.org/jira/browse/KAFKA-16898
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1629590599


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -656,5 +657,47 @@ Found problem:
   assertEquals(1, exitStatus)
 }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+val args = Array("format",
+  "-c", "dummy.properties",
+  "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+  "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+assertEquals(0, exitCode)
+  }
+
+  private def createPropsFile(properties: Properties): String = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+try {
+  properties.store(propsStream, "config.props")
+} finally {
+  propsStream.close()
+}
+propsFile.toPath.toString
+  }
+
+  @Test
+  def testJbodSupportValidation(): Unit = {
+def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): 
Integer = {
+  val properties = TestUtils.createBrokerConfig(10, null, logDirCount = 
logDirCount)
+  
properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)

Review Comment:
   Ah right, thanks for checking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16905: Fixed potential thread block by admin API [kafka]

2024-06-06 Thread via GitHub


apoorvmittal10 commented on PR #16217:
URL: https://github.com/apache/kafka/pull/16217#issuecomment-2152588796

   > Can you open a Jira so it's easier for users to find? Thanks
   
   Done, https://issues.apache.org/jira/browse/KAFKA-16905


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629598556


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -29,19 +32,22 @@ public class QuorumInfo {
 private final long highWatermark;
 private final List voters;
 private final List observers;
+private final Map nodes;

Review Comment:
   > It looks like we already store nodeId in Node object, and we don't need 
the nodeId as key, right?
   
   Correct. My first implementation was Set based. But, @cmccabe ask to 
refactor to Map
   See - https://github.com/apache/kafka/pull/16106#discussion_r1628472921



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


chia7712 commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1629602062


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -377,19 +375,19 @@ private Assignment newMemberAssignment(
 ) {
 MemberAssignment newMemberAssignment = 
newGroupAssignment.members().get(memberId);
 if (newMemberAssignment != null) {
-return new Assignment(newMemberAssignment.targetPartitions());
+return new Assignment(newMemberAssignment.partitions());
 } else {
 return Assignment.EMPTY;
 }
 }
 
 // private for testing
-static MemberSubscriptionSpecImpl createMemberSubscriptionSpecImpl(
+static MemberSubscriptionAndAssignmentImpl 
createMemberSubscriptionSpecImpl(

Review Comment:
   Should it get renamed according to `MemberSubscriptionAndAssignmentImpl`? or 
it can be a constructor of `MemberSubscriptionSpecImpl`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629608588


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -89,26 +100,30 @@ public String toString() {
 ", highWatermark=" + highWatermark +
 ", voters=" + voters +
 ", observers=" + observers +
+", nodes=" + nodes +

Review Comment:
   Node#toString added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629610226


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -73,12 +83,13 @@ public boolean equals(Object o) {
 && leaderEpoch == that.leaderEpoch
 && highWatermark == that.highWatermark
 && Objects.equals(voters, that.voters)
-&& Objects.equals(observers, that.observers);
+&& Objects.equals(observers, that.observers)
+&& Objects.equals(nodes, that.nodes);

Review Comment:
   Node#equals, Node#hashCode added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-06 Thread via GitHub


dajac commented on PR #16215:
URL: https://github.com/apache/kafka/pull/16215#issuecomment-2152628752

   @jeffkbkim @jolshan Thanks for your comments. I addressed them. I pushed the 
tests and a few fixes too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16706) Refactor ReplicationQuotaManager/RLMQuotaManager to eliminate code duplication

2024-06-06 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852802#comment-17852802
 ] 

Muralidhar Basani commented on KAFKA-16706:
---

Yes I see similarities. May be coming up with a structure like below ?
 * New QuotaManager Interface,
 * New BaseQuotaManager abstract class extends QuotaManager,
 * ReplicationQuotaManager extends BaseQuotaManager
 * RLMQuotaManager extends from BaseQuotaManager

wdyt ?

> Refactor ReplicationQuotaManager/RLMQuotaManager to eliminate code duplication
> --
>
> Key: KAFKA-16706
> URL: https://issues.apache.org/jira/browse/KAFKA-16706
> Project: Kafka
>  Issue Type: Task
>Reporter: Abhijeet Kumar
>Priority: Minor
>
> ReplicationQuotaManager and RLMQuotaManager implementations are similar. We 
> should explore ways to refactor them to remove code duplication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


philipnee commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1629622755


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   hey @appchemist - seems like you closed this pr. do you mean to continue 
working on this PR?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16706) Refactor ReplicationQuotaManager/RLMQuotaManager to eliminate code duplication

2024-06-06 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852802#comment-17852802
 ] 

Muralidhar Basani edited comment on KAFKA-16706 at 6/6/24 2:06 PM:
---

[~abhijeetkumar] Yes I see similarities. May be coming up with a structure like 
below ?
 * New QuotaManager Interface,
 * New BaseQuotaManager abstract class extends QuotaManager,
 * ReplicationQuotaManager extends BaseQuotaManager
 * RLMQuotaManager extends from BaseQuotaManager

wdyt ?


was (Author: muralibasani):
Yes I see similarities. May be coming up with a structure like below ?
 * New QuotaManager Interface,
 * New BaseQuotaManager abstract class extends QuotaManager,
 * ReplicationQuotaManager extends BaseQuotaManager
 * RLMQuotaManager extends from BaseQuotaManager

wdyt ?

> Refactor ReplicationQuotaManager/RLMQuotaManager to eliminate code duplication
> --
>
> Key: KAFKA-16706
> URL: https://issues.apache.org/jira/browse/KAFKA-16706
> Project: Kafka
>  Issue Type: Task
>Reporter: Abhijeet Kumar
>Priority: Minor
>
> ReplicationQuotaManager and RLMQuotaManager implementations are similar. We 
> should explore ways to refactor them to remove code duplication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module [kafka]

2024-06-06 Thread via GitHub


dajac commented on code in PR #16198:
URL: https://github.com/apache/kafka/pull/16198#discussion_r1629625845


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -377,19 +375,19 @@ private Assignment newMemberAssignment(
 ) {
 MemberAssignment newMemberAssignment = 
newGroupAssignment.members().get(memberId);
 if (newMemberAssignment != null) {
-return new Assignment(newMemberAssignment.targetPartitions());
+return new Assignment(newMemberAssignment.partitions());
 } else {
 return Assignment.EMPTY;
 }
 }
 
 // private for testing
-static MemberSubscriptionSpecImpl createMemberSubscriptionSpecImpl(
+static MemberSubscriptionAndAssignmentImpl 
createMemberSubscriptionSpecImpl(

Review Comment:
   Fixed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to transaction-coordinator and server-common [kafka]

2024-06-06 Thread via GitHub


chia7712 commented on code in PR #16172:
URL: https://github.com/apache/kafka/pull/16172#discussion_r1629625646


##
build.gradle:
##
@@ -853,8 +851,8 @@ subprojects {
 skipProjects = [ ":jmh-benchmarks", ":trogdor" ]
 skipConfigurations = [ "zinc" ]
   }
-  
-  if(JavaVersion.current().isJava11Compatible() && project.path !in 
excludedSpotlessModules) {
+  //  current spotless work error in JDK21,upgrade spotless version when kafka 
drop support JDK8

Review Comment:
   ```
   the task `removeUnusedImports` is implemented by google-java-format, and 
unfortunately the google-java-format version used by spotless 6.14.0 can't work 
with JDK 21. Hence, we apply spotless tasks only if the env is either JDK11 or 
JDK17
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629645429


##
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##
@@ -60,6 +60,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
   val response = connectAndReceive[DescribeQuorumResponse](request)
 
   assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+  assertEquals("", response.data.errorMessage)

Review Comment:
   `Readable#readString` returns empty string if length of string equals to 0.
   `DescribeQuorumResponseData#read` writes empty string to `errorMessage` for 
other version of RPC that not supports new field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-06 Thread via GitHub


nizhikov commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1629646026


##
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##
@@ -69,6 +70,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
   val partitionData = topicData.partitions.get(0)
   assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
   assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+  assertEquals("", partitionData.errorMessage())

Review Comment:
   See https://github.com/apache/kafka/pull/16106#discussion_r1629645429



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-06 Thread via GitHub


mimaison commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2152667234

   You can backport it to 3.7 too if you want.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1629677489


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),

Review Comment:
   This PR got merged so let's get the latest changes to get rid of the 
deprecated assignor here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-06 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1629681404


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),

Review Comment:
   I understand, will include the change in the next commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist opened a new pull request, #15961:
URL: https://github.com/apache/kafka/pull/15961

   - Add ErrorPropagateMetadataUpdater
  - Just proxy but propagates error though BackgroundEventHandler
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16906) Add consistent error handling for Transactions

2024-06-06 Thread Kaushik Raina (Jira)
Kaushik Raina created KAFKA-16906:
-

 Summary: Add consistent error handling for Transactions
 Key: KAFKA-16906
 URL: https://issues.apache.org/jira/browse/KAFKA-16906
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, producer , streams
Reporter: Kaushik Raina


Apache Kafka supports a variety of client SDKs for different programming 
languages.
We want to group errors handing into 5 types which will help in keeping 
consistent error handling across all clients SDKs and Producer APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #16193:
URL: https://github.com/apache/kafka/pull/16193#discussion_r1629709460


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -728,20 +725,14 @@ synchronized public ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(Map<
 String group = groupSpecs.keySet().iterator().next();
 Collection topicPartitions = 
groupSpecs.get(group).topicPartitions();
 final KafkaFutureImpl> future = 
new KafkaFutureImpl<>();
-
-if (listConsumerGroupOffsetsException != null) {
-future.completeExceptionally(listConsumerGroupOffsetsException);
+if (topicPartitions.isEmpty()) {

Review Comment:
   Done, thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16536) Use BeginQuorumEpoch as the leader's heartbeat

2024-06-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio reassigned KAFKA-16536:
--

Assignee: Alyssa Huang  (was: José Armando García Sancio)

> Use BeginQuorumEpoch as the leader's heartbeat
> --
>
> Key: KAFKA-16536
> URL: https://issues.apache.org/jira/browse/KAFKA-16536
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Alyssa Huang
>Priority: Major
>
> Change the leader's implementation of the BeginQuorumEpoch to behave more 
> like a heartbeat mechanism. The first implementation can just send the 
> request at some interval (half the fetch timeout).
> Future implementations can conserve resource by tracking fetch timeouts per 
> remove voter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]

2024-06-06 Thread via GitHub


jolshan commented on code in PR #16183:
URL: https://github.com/apache/kafka/pull/16183#discussion_r1629730197


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -205,7 +205,10 @@ public enum MetadataVersion {
 IBP_3_8_IV0(20, "3.8", "IV0", true),
 
 // Introduce version 1 of the GroupVersion feature (KIP-848).
-IBP_4_0_IVO(21, "4.0", "IV0", false);
+IBP_4_0_IVO(21, "4.0", "IV0", false),

Review Comment:
   Hehe. This will be fixed when I fix conflicts. But good to keep in mind 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >