[jira] [Resolved] (KAFKA-15881) Make changes in Release Process Wiki and Release Process

2023-12-12 Thread Vedarth Sharma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vedarth Sharma resolved KAFKA-15881.

Fix Version/s: 3.7.0
 Reviewer: Manikumar
   Resolution: Fixed

Release Process is updated

> Make changes in Release Process Wiki and Release Process
> 
>
> Key: KAFKA-15881
> URL: https://issues.apache.org/jira/browse/KAFKA-15881
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vedarth Sharma
>Assignee: Vedarth Sharma
>Priority: Major
> Fix For: 3.7.0
>
>
> Make changes to Release Process Wiki and docker README.md for detailed 
> release process instructions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15775: New consumer listTopics and partitionsFor [kafka]

2023-12-12 Thread via GitHub


lucasbru commented on PR #14962:
URL: https://github.com/apache/kafka/pull/14962#issuecomment-1853418387

   @AndrewJSchofield Please have a look at my comments above, and if you think 
it's worth fixing, please open a follow up PR. I merged the change since I 
don't think the feature needs to be blocked on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15775: New consumer listTopics and partitionsFor [kafka]

2023-12-12 Thread via GitHub


lucasbru merged PR #14962:
URL: https://github.com/apache/kafka/pull/14962


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424943646


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
+MockTimer timer = new MockTimer();
+// The partition writer only accept on write.
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0, ctx.coordinator.lastWrittenOffset());
+assertEquals(0, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Write #1. We should get a TimeoutException because the HWM will not 
advance.
+CompletableFuture timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3),
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+
+timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1);

Review Comment:
   We should use `3` here to be consistent with `Duration.ofMillis(3)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]

2023-12-12 Thread via GitHub


yashmayya commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1424910127


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest {
 private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
 private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
 private static final int NUM_WORKERS = 3;
-private static final String CONNECTOR_NAME = "test-connector";
-private static final String TOPIC = "test-topic";
 private static final int NUM_TASKS = 2;
 private static final int NUM_RECORDS_PER_PARTITION = 10;
-private Map workerProps;
-private EmbeddedConnectCluster.Builder connectBuilder;
+private static final Map, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+@Rule
+public TestName currentTest = new TestName();
 private EmbeddedConnectCluster connect;
+private String connectorName;
+private String topic;
 
 @Before
 public void setup() {
-Properties brokerProps = new Properties();
-brokerProps.put("transaction.state.log.replication.factor", "1");
-brokerProps.put("transaction.state.log.min.isr", "1");
-
-// setup Connect worker properties
-workerProps = new HashMap<>();
-workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-// build a Connect cluster backed by Kafka and Zk
-connectBuilder = new EmbeddedConnectCluster.Builder()
-.name("connect-cluster")
-.numWorkers(NUM_WORKERS)
-.brokerProps(brokerProps)
-.workerProps(workerProps);
+connectorName = currentTest.getMethodName();
+topic = currentTest.getMethodName();
+connect = defaultConnectCluster();
 }
 
 @After
 public void tearDown() {
-connect.stop();
+Set remainingConnectors = new HashSet<>(connect.connectors());
+if (remainingConnectors.remove(connectorName)) {
+connect.deleteConnector(connectorName);
+}
+try {
+assertEquals(
+"Some connectors were not properly cleaned up after this 
test",
+Collections.emptySet(),
+remainingConnectors
+);
+} finally {
+// Make a last-ditch effort to clean up the leaked connectors
+// so as not to interfere with other test cases
+remainingConnectors.forEach(connect::deleteConnector);
+}
+}
+
+@AfterClass
+public static void close() {
+// stop all Connect, Kafka and Zk threads.
+CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
+}
+
+private static EmbeddedConnectCluster 
createOrReuseConnectWithWorkerProps(Map workerProps) {
+return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> {
+Properties brokerProps = new Properties();
+brokerProps.put("transaction.state.log.replication.factor", "1");
+brokerProps.put("transaction.state.log.min.isr", "1");
+
+// Have to declare a new map since the passed-in one may be 
immutable
+Map workerPropsWithDefaults = new 
HashMap<>(workerProps);
+// Enable fast offset commits by default
+
workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+
+EmbeddedConnectCluster result = new 
EmbeddedConnectCluster.Builder()
+.name("connect-cluster")
+.numWorkers(NUM_WORKERS)
+.brokerProps(brokerProps)
+.workerProps(workerPropsWithDefaults)
+.build();
+
+result.start();
+
+return result;
+});
+}
+
+private static EmbeddedConnectCluster defaultConnectCluster() {
+return createOrReuseConnectWithWorkerProps(Collections.emptyMap());
+}
+
+private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() {
+Map workerProps = Collections.singletonMap(
+DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+"enabled"
+);
+return createOrReuseConnectWithWorkerProps(workerProps);
 }
 
 @Test
 public void testGetNonExistentConnectorOffsets() {
-connect = connectBuilder.build();
-connect.start();
 ConnectRestException e = assertThrows(ConnectRestException.class,
 () -> connect.connectorOffsets("non-existent-connector"));
 assertEquals(404, e.errorCode());
 }
 
 @Test
 public void testGetSinkConnectorOffsets() throws Exception {
-connect = connectBuilder.build();
-connect.start();
 

Re: [PR] MINOR: Rename and update test files for docker image [kafka]

2023-12-12 Thread via GitHub


omkreddy merged PR #14991:
URL: https://github.com/apache/kafka/pull/14991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


vamossagar12 commented on PR #14981:
URL: https://github.com/apache/kafka/pull/14981#issuecomment-1853271332

   Thanks for the review @dajac . I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2023-12-12 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-15558:
--

Assignee: Phuc Hong Tran

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs [kafka]

2023-12-12 Thread via GitHub


soarez commented on PR #14998:
URL: https://github.com/apache/kafka/pull/14998#issuecomment-1853247402

   > 36 tests have failed There are 0 new tests failing, 36 existing failing 
and 271 skipped.
   
   @rondagostino PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]

2023-12-12 Thread via GitHub


showuon commented on PR #14832:
URL: https://github.com/apache/kafka/pull/14832#issuecomment-1853214842

   @clolov , I took the liberty to help fix the checksytle error. Let's see if 
the CI build passed. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Upgrade jqwik to version 1.8.0 [kafka]

2023-12-12 Thread via GitHub


github-actions[bot] commented on PR #14365:
URL: https://github.com/apache/kafka/pull/14365#issuecomment-1853213017

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor : Increase Config change throwable log info to error [kafka]

2023-12-12 Thread via GitHub


github-actions[bot] commented on PR #14380:
URL: https://github.com/apache/kafka/pull/14380#issuecomment-1853212990

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]

2023-12-12 Thread via GitHub


showuon commented on PR #14832:
URL: https://github.com/apache/kafka/pull/14832#issuecomment-1853210629

   @clolov , there are checkstyle error, please correct them:
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14832/6/pipeline
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15981: update Group size only when groups size changes [kafka]

2023-12-12 Thread via GitHub


jeffkbkim commented on PR #14988:
URL: https://github.com/apache/kafka/pull/14988#issuecomment-1853155239

   The previous build had the error from test 
`testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() – 
org.apache.kafka.coordinator.group.GroupMetadataManagerTest` from jdk 8
   ```
   expected: 
<[Record(key=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentKey(groupId='fooup',
 memberId='_hAb4Z1NSHG2Xp42s7KO9Q') at version 8), 
value=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentValue(memberEpoch=-2,
 previousMemberEpoch=9, targetMemberEpoch=10, 
assignedPartitions=[TopicPartitions(topicId=YX9w07JjSp2Yw1ra6Bfkaw, 
partitions=[2]), TopicPartitions(topicId=brQVJtH8RsyF4BayziKeBQ, partitions=[3, 
4, 5])], partitionsPendingRevocation=[], partitionsPendingAssignment=[], 
error=0, metadataVersion=0, metadataBytes=[]) at version 0))]> but was: 
<[Record(key=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentKey(groupId='fooup',
 memberId='_hAb4Z1NSHG2Xp42s7KO9Q') at version 8), 
value=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentValue(memberEpoch=-2,
 previousMemberEpoch=9, targetMemberEpoch=10, 
assignedPartitions=[TopicPartitions(topicId=brQVJtH8RsyF4BayziKeBQ, 
partitions=[3, 4, 5]), TopicPartitions(topicId=YX9w07JjSp2Yw1ra6Bfkaw, 
partitions=
 [2])], partitionsPendingRevocation=[], partitionsPendingAssignment=[], 
error=0, metadataVersion=0, metadataBytes=[]) at version 0))]>
   ```
   
   Where the diff is the ordering between the two `TopicPartitions` in 
`assignedPartitions` field. I don't think my code touched this. Was this 
already flaky?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: docs for rack aware assignment [kafka]

2023-12-12 Thread via GitHub


lihaosky opened a new pull request, #14999:
URL: https://github.com/apache/kafka/pull/14999

   Docs for the new `balance_subtopology` config.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15366: Modify LogDirFailureTest for KRaft [kafka]

2023-12-12 Thread via GitHub


soarez commented on code in PR #14977:
URL: https://github.com/apache/kafka/pull/14977#discussion_r1424698314


##
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala:
##
@@ -191,12 +204,27 @@ class LogDirFailureTest extends IntegrationTestHarness {
 TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
 
 // There should be no remaining LogDirEventNotification znode
-assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
+if (quorum == "zk") {
+  assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
+}
 
-// The controller should have marked the replica on the original leader as 
offline
-val controllerServer = servers.find(_.kafkaController.isActive).get
-val offlineReplicas = 
controllerServer.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-assertTrue(offlineReplicas.contains(PartitionAndReplica(new 
TopicPartition(topic, 0), leaderServerId)))
+if (quorum == "kraft") {
+  waitUntilTrue(() => {
+brokers.exists(broker => {
+  val hasOfflineDir = 
broker.asInstanceOf[BrokerServer].logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString)
+  hasOfflineDir && broker.asInstanceOf[BrokerServer]
+.replicaManager
+.metadataCache
+.getClusterMetadata(broker.clusterId, 
broker.config.interBrokerListenerName)
+.partition(new TopicPartition(topic, 
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId)

Review Comment:
   Hey @viktorsomogyi, at least one of the issues here was this -> #14998



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs [kafka]

2023-12-12 Thread via GitHub


soarez opened a new pull request, #14998:
URL: https://github.com/apache/kafka/pull/14998

   Any directory changes must be considered when replaying 
BrokerRegistrationChangeRecord. This is necessary
   to persist directory failures in the cluster metadata, which #14902 missed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15111) Correction kafka examples

2023-12-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15111.
-
Resolution: Duplicate

> Correction kafka examples
> -
>
> Key: KAFKA-15111
> URL: https://issues.apache.org/jira/browse/KAFKA-15111
> Project: Kafka
>  Issue Type: Task
>Reporter: Dmitry
>Priority: Minor
> Fix For: 3.6.0
>
>
> Need set TOPIC_NAME = topic1 in KafkaConsumerProducerDemo class and remove 
> unused TOPIC field from KafkaProperties.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15111) Correction kafka examples

2023-12-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15111:

Fix Version/s: 3.6.0
   (was: 3.7.0)

> Correction kafka examples
> -
>
> Key: KAFKA-15111
> URL: https://issues.apache.org/jira/browse/KAFKA-15111
> Project: Kafka
>  Issue Type: Task
>Reporter: Dmitry
>Priority: Minor
> Fix For: 3.6.0
>
>
> Need set TOPIC_NAME = topic1 in KafkaConsumerProducerDemo class and remove 
> unused TOPIC field from KafkaProperties.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-12-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795958#comment-17795958
 ] 

Greg Harris commented on KAFKA-15372:
-

This is now set to release for 3.7 and 3.6, but I had some issues with the 3.5 
backport that I had to revert. In particular, the DedicatedMirrorTest has this 
persistent failure:
{noformat}
    org.apache.kafka.test.NoRetryException
        at 
app//org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.lambda$awaitTaskConfigurations$8(DedicatedMirrorIntegrationTest.java:363)
        at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
        at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
        at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
        at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
        at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)
        at 
app//org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.awaitTaskConfigurations(DedicatedMirrorIntegrationTest.java:353)
        at 
app//org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster(DedicatedMirrorIntegrationTest.java:301)
        
Caused by:
        java.util.concurrent.ExecutionException: 
org.apache.kafka.connect.runtime.distributed.RebalanceNeededException: Request 
cannot be completed because a rebalance is expected
            at 
org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:123)
            at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:115)
            at 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.lambda$awaitTaskConfigurations$8(DedicatedMirrorIntegrationTest.java:357)
            ... 7 more            
Caused by:
            
org.apache.kafka.connect.runtime.distributed.RebalanceNeededException: Request 
cannot be completed because a rebalance is expected{noformat}

> MM2 rolling restart can drop configuration changes silently
> ---
>
> Key: KAFKA-15372
> URL: https://issues.apache.org/jira/browse/KAFKA-15372
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> When MM2 is restarted, it tries to update the Connector configuration in all 
> flows. This is a one-time trial, and fails if the Connect worker is not the 
> leader of the group.
> In a distributed setup and with a rolling restart, it is possible that for a 
> specific flow, the Connect worker of the just restarted MM2 instance is not 
> the leader, meaning that Connector configurations can get dropped.
> For example, assuming 2 MM2 instances, and one flow A->B:
>  # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
> leader of A->B Connect group.
>  # MM2 instance 1 tries to update the Connector configurations, but fails 
> (instance 2 has the leader, not instance 1)
>  # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
>  # MM2 instance 2 tries to update the Connector configurations, but fails
> At this point, the configuration changes before the restart are never 
> applied. Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-12-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15372:

Fix Version/s: 3.6.2

> MM2 rolling restart can drop configuration changes silently
> ---
>
> Key: KAFKA-15372
> URL: https://issues.apache.org/jira/browse/KAFKA-15372
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> When MM2 is restarted, it tries to update the Connector configuration in all 
> flows. This is a one-time trial, and fails if the Connect worker is not the 
> leader of the group.
> In a distributed setup and with a rolling restart, it is possible that for a 
> specific flow, the Connect worker of the just restarted MM2 instance is not 
> the leader, meaning that Connector configurations can get dropped.
> For example, assuming 2 MM2 instances, and one flow A->B:
>  # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
> leader of A->B Connect group.
>  # MM2 instance 1 tries to update the Connector configurations, but fails 
> (instance 2 has the leader, not instance 1)
>  # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
>  # MM2 instance 2 tries to update the Connector configurations, but fails
> At this point, the configuration changes before the restart are never 
> applied. Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Give FETCH request own MetadataVersion [kafka]

2023-12-12 Thread via GitHub


jolshan opened a new pull request, #14997:
URL: https://github.com/apache/kafka/pull/14997

   
https://github.com/apache/kafka/commit/c8f687ac1505456cb568de2b60df235eb1ceb5f0 
was incorrect in reusing the same metadata version to enable the new fetch 
request. 
   
   This fixes the issue by giving the version bump its own metadata version.
   
   Note -- this is a no-op in running code. The fetch request to replica 
fetchers will be the same semantically. We just wanted to fix upgrades with MV 
3_7_IV0 that could encounter unknown fetch versions when upgrading from a 
version that didn't contain this change to one that did.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]

2023-12-12 Thread via GitHub


gharris1727 merged PR #14293:
URL: https://github.com/apache/kafka/pull/14293


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]

2023-12-12 Thread via GitHub


gharris1727 commented on PR #14293:
URL: https://github.com/apache/kafka/pull/14293#issuecomment-1852872688

   Test failures appear unrelated, and the mirror and runtime tests pass 
locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-12 Thread via GitHub


hachikuji commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1424609415


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -1352,6 +1360,28 @@ object GroupMetadataManager {
   "%X".format(BigInt(1, bytes))
   }
 
+  def maybeConvertError(error: Errors) : Errors = {

Review Comment:
   nit: can we choose a more descriptive name? Perhaps include `OffsetCommit` 
somewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-12 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1424609554


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -102,6 +103,14 @@ public long append(
 }
 }
 
+@Override
+public long completeTransaction(
+TopicPartition tp,
+WriteTxnMarkersRequest.TxnMarkerEntry marker
+) throws KafkaException {
+throw new IllegalStateException("Not implemented");

Review Comment:
   What is this class used for again? And is this planned in a future PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-12 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1424597195


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -135,7 +136,22 @@ class CoordinatorLoaderImpl[T](
 
 memoryRecords.batches.forEach { batch =>
   if (batch.isControlBatch) {
-throw new IllegalStateException("Control batches are not 
supported yet.")
+batch.asScala.foreach { record =>

Review Comment:
   Is it always the case that we need to complete the transaction on loading?
   Is this because loading rebuilds memory from the log so we play through the 
whole transaction again?
   Maybe I'm just getting confused by the "completeTransaction" method names as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424589872


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -285,44 +286,6 @@ void testEnsureEventsAreCompleted() {
 assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test

Review Comment:
   removed because they are irrelevant now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424585777


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -810,89 +854,87 @@ public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId
 final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
 props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
 final ConsumerConfig config = new ConsumerConfig(props);
-try (final AsyncKafkaConsumer consumer =
-new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
-
-final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
-
-assertEquals(groupId, groupMetadata.groupId());
-assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
-assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
-assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
-}
+final AsyncKafkaConsumer consumer =
+new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer());
+final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
+assertEquals(groupId, groupMetadata.groupId());
+assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
+assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+consumer.close(Duration.ZERO);
 }
 
 @Test
 public void testGroupMetadataUpdateSingleCall() {
 final String groupId = "consumerGroupA";
 final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
 final LinkedBlockingQueue backgroundEventQueue = new 
LinkedBlockingQueue<>();
-try (final AsyncKafkaConsumer consumer =

Review Comment:
   ok makes sense, I can revert these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]

2023-12-12 Thread via GitHub


hni61223 commented on code in PR #14996:
URL: https://github.com/apache/kafka/pull/14996#discussion_r1424548302


##
bin/kafka-server-stop.sh:
##
@@ -36,7 +36,7 @@ else
 declare -a AbsolutePathToConfigArray
 for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do
 AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}")
-if [ -z "$AbsolutePathToConfig" ]; then
+if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ 
-n "$INPUT_NID" ]; then

Review Comment:
   yeah that will work. Let me change that.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]

2023-12-12 Thread via GitHub


rondagostino commented on code in PR #14996:
URL: https://github.com/apache/kafka/pull/14996#discussion_r1424547476


##
bin/kafka-server-stop.sh:
##
@@ -36,7 +36,7 @@ else
 declare -a AbsolutePathToConfigArray
 for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do
 AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}")
-if [ -z "$AbsolutePathToConfig" ]; then
+if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ 
-n "$INPUT_NID" ]; then

Review Comment:
   It might actually be best to rework things a bit to move the whole 
"AbsolutePathToConfigArray " section down until after line 53.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]

2023-12-12 Thread via GitHub


hni61223 commented on code in PR #14996:
URL: https://github.com/apache/kafka/pull/14996#discussion_r1424547291


##
bin/kafka-server-stop.sh:
##
@@ -36,7 +36,7 @@ else
 declare -a AbsolutePathToConfigArray
 for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do
 AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}")
-if [ -z "$AbsolutePathToConfig" ]; then
+if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ 
-n "$INPUT_NID" ]; then

Review Comment:
   You are right. Let me fix this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15912) Parallelize conversion and transformation steps in Connect

2023-12-12 Thread Vojtech Juranek (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795926#comment-17795926
 ] 

Vojtech Juranek commented on KAFKA-15912:
-

I'd be careful to do the parallelization per SMT/converter as moving data 
between threads maybe be in result more expensive, as you already mentioned. 
Also, if there is some bottleneck, e.g. value converter, running it in a single 
thread won't give any significant speed up. And this is actually what we 
(Debezium project) observe, either in our perf. tests or reported by the users 
(e.g. KAFKA-15996, resp. [DBZ-7240|https://issues.redhat.com/browse/DBZ-7240]). 
It would be IMHO more useful, if possible, to form record pipelines and run in 
parallel records through these pipelines in parallel. With this approach, 
thread safety can be solved e.g. by creating SMT/convertors copies for each 
processing pipeline. The issue with stateful transformation however remains. 
Also there is an issue with records ordering, however quite easily solvable 
when processing is done in batches. I'm currently doing some experiments with 
Debezium server, if such pipelines are possible there and if it gives any 
significant performance boost (still WIP, no results yet). So I'm wondering if 
doing something similar for Kafka Connect make sense for you or this seems to 
be too much complicated to worth the effort/possible backward compatibility 
issues/etc?

> Parallelize conversion and transformation steps in Connect
> --
>
> Key: KAFKA-15912
> URL: https://issues.apache.org/jira/browse/KAFKA-15912
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Mickael Maison
>Priority: Major
>
> In busy Connect pipelines, the conversion and transformation steps can 
> sometimes have a very significant impact on performance. This is especially 
> true with large records with complex schemas, for example with CDC connectors 
> like Debezium.
> Today in order to always preserve ordering, converters and transformations 
> are called on one record at a time in a single thread in the Connect worker. 
> As Connect usually handles records in batches (up to max.poll.records in sink 
> pipelines, for source pipelines while it really depends on the connector, 
> most connectors I've seen still tend to return multiple records each loop), 
> it could be highly beneficial to attempt running the converters and 
> transformation chain in parallel by a pool a processing threads.
> It should be possible to do some of these steps in parallel and still keep 
> exact ordering. I'm even considering whether an option to lose ordering but 
> allow even faster processing would make sense.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]

2023-12-12 Thread via GitHub


rondagostino commented on code in PR #14996:
URL: https://github.com/apache/kafka/pull/14996#discussion_r1424545171


##
bin/kafka-server-stop.sh:
##
@@ -36,7 +36,7 @@ else
 declare -a AbsolutePathToConfigArray
 for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do
 AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}")
-if [ -z "$AbsolutePathToConfig" ]; then
+if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ 
-n "$INPUT_NID" ]; then

Review Comment:
   Is there a precedence issue here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]

2023-12-12 Thread via GitHub


hni61223 opened a new pull request, #14996:
URL: https://github.com/apache/kafka/pull/14996

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-12 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1424518210


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -892,6 +895,43 @@ public void replay(
 }
 }
 
+/**
+ * Applies the given transaction marker.
+ *
+ * @param producerIdThe producer id.
+ * @param resultThe result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+public void completeTransaction(
+long producerId,
+TransactionResult result
+) throws RuntimeException {
+Offsets pendingOffsets = 
pendingTransactionalOffsets.remove(producerId);
+
+if (result == TransactionResult.COMMIT) {
+log.debug("Committed transactional offset commits for producer id 
{}.", producerId);
+if (pendingOffsets == null) return;
+
+pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {

Review Comment:
   is there a reason why we need to store all the offsets in these nested maps? 
If we are just going to commit all the data anyway? I was wondering if the same 
would be accomplished if we had a list of all the info for each producer id. I 
guess it is the same complexity and we can reuse the object. I was just curious 
if we ever needed access for a random specific partition for a given producer 
ID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16001) Migrate ConsumerNetworkThreadTestBuilder away from ConsumerTestBuilder

2023-12-12 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16001:
--

 Summary: Migrate ConsumerNetworkThreadTestBuilder away from 
ConsumerTestBuilder
 Key: KAFKA-16001
 URL: https://issues.apache.org/jira/browse/KAFKA-16001
 Project: Kafka
  Issue Type: Sub-task
Reporter: Lucas Brutschy






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-12 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1424518210


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -892,6 +895,43 @@ public void replay(
 }
 }
 
+/**
+ * Applies the given transaction marker.
+ *
+ * @param producerIdThe producer id.
+ * @param resultThe result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+public void completeTransaction(
+long producerId,
+TransactionResult result
+) throws RuntimeException {
+Offsets pendingOffsets = 
pendingTransactionalOffsets.remove(producerId);
+
+if (result == TransactionResult.COMMIT) {
+log.debug("Committed transactional offset commits for producer id 
{}.", producerId);
+if (pendingOffsets == null) return;
+
+pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {

Review Comment:
   is there a reason why we need to store all the offsets in these nested maps? 
If we are just going to commit all the data anyway? I was wondering if the same 
would be accomplished if we had a list of all the info for each producer id. I 
guess it is the same complexity. I was just curious if we ever needed access 
for a random specific partition for a given producer ID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16000) Migrate MembershipManagerImpl away from ConsumerTestBuilder

2023-12-12 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16000:
--

 Summary: Migrate MembershipManagerImpl away from 
ConsumerTestBuilder
 Key: KAFKA-16000
 URL: https://issues.apache.org/jira/browse/KAFKA-16000
 Project: Kafka
  Issue Type: Sub-task
Reporter: Lucas Brutschy






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder

2023-12-12 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-15999:
--

 Summary: Migrate HeartbeatRequestManagerTest away from 
ConsumerTestBuilder
 Key: KAFKA-15999
 URL: https://issues.apache.org/jira/browse/KAFKA-15999
 Project: Kafka
  Issue Type: Sub-task
Reporter: Lucas Brutschy






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15913) Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder

2023-12-12 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795918#comment-17795918
 ] 

Lucas Brutschy commented on KAFKA-15913:


Splitting off the other tests

> Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder
> 
>
> Key: KAFKA-15913
> URL: https://issues.apache.org/jira/browse/KAFKA-15913
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
>
> ConsumerTestBuilder is meant to be an unit testing utility; however, we seem 
> to use Mockito#spy quite liberally.  This is not the right testing strategy 
> because we basically turn unit testing into integration testing.
>  
> While the current unit tests run fine, we should probably make the mocking 
> using Mockito#mock by default and test each dependency independently.
>  
> The ask here is
>  # Make mock(class) by default
>  # Provide more flexible interface for the testBuilder to allow user to 
> configure spy or mock.  Or, let user pass in their own mock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15913) Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder

2023-12-12 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15913:
--

Assignee: Lucas Brutschy
 Summary: Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder  
(was: Remove excessive use of spy in ConsumerTestBuilder)

> Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder
> 
>
> Key: KAFKA-15913
> URL: https://issues.apache.org/jira/browse/KAFKA-15913
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
>
> ConsumerTestBuilder is meant to be an unit testing utility; however, we seem 
> to use Mockito#spy quite liberally.  This is not the right testing strategy 
> because we basically turn unit testing into integration testing.
>  
> While the current unit tests run fine, we should probably make the mocking 
> using Mockito#mock by default and test each dependency independently.
>  
> The ask here is
>  # Make mock(class) by default
>  # Provide more flexible interface for the testBuilder to allow user to 
> configure spy or mock.  Or, let user pass in their own mock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-12 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1424512647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -862,6 +863,7 @@ public void replay(
 }
 
 if (producerId == RecordBatch.NO_PRODUCER_ID) {
+log.debug("Replaying offset commit with producer id {}, key 
{}, value {}", producerId, key, value);

Review Comment:
   Do we care to print out the producer ID if it is "no producer id"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15998) EAGER rebalance onPartitionsAssigned() called with no previous onPartitionsLost() nor onPartitionsRevoked()

2023-12-12 Thread Jonathan Haapala (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Haapala updated KAFKA-15998:
-
Description: 
I ran into a case where {{onPartitionsAssigned()}} was called without first 
calling {{onPartitionsRevoked()}} and there is no indication that 
{{onPartitionsLost()}} was called or had any reason to be called. We are using 
the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0.

Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}:
{quote}In eager rebalancing, it will always be called at the start of a 
rebalance and after the consumer stops fetching data.
{quote}
We internally keep track of partition states with a state machine, and rely on 
these APIs to assert what expected states we are in. So when a partition is 
Revoked and then re-Assigned, we know that we kept ownership. Moreover, if we 
are assigned partitions in EAGER rebalancing, we expect that entire assignment 
is passed to {{{}onPartitionsAssigned(){}}}, because if 
{{onPartitionsRevoked()}} is always called at the start of a rebalance and 
EAGER protocol always revokes the entire assignment, then by the time we hit 
{{onPartitionsAssigned()}} there should be nothing assigned from the consumer's 
point of view, and therefore the entire assignment is newly added.

However, we recently ran into a situation where we received an assignment while 
the consumer's existing assignment was non-empty:
|     *Pod*|                                       *Message*|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Notifying assignor about the {*}new 
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, 
topic-123, topic-130, topic-137, topic-141])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, 
topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
topic-141|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} 
[kafka-coordinator-heartbeat-thread \\| metric-aggregator] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] *Request joining group* due to: group is already 
rebalancing|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Successfully joined group with generation 
Generation\{generationId=12417, 
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
 protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Successfully synced group in generation 
Generation{generationId={*}12417{*}, 
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
 protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Notifying assignor about the {*}new 
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: |

Here you can see we get assigned partitions:

  26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141

And promptly see them all as newly added when passed to 
{{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices 
another rebalance and requests to join. It quickly succeeds and then almost 
immediately successfully syncs. We then get a new assignment:

  26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117

This is a subset of the partitions we were assigned previously, missing 123, 
130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the 
beginning of this rebalance, the consumer still has the old assignment as its 
current 

[jira] [Created] (KAFKA-15998) EAGER rebalance onPartitionsAssigned() called with no previous onPartitionsLost() nor onPartitionsRevoked()

2023-12-12 Thread Jonathan Haapala (Jira)
Jonathan Haapala created KAFKA-15998:


 Summary: EAGER rebalance onPartitionsAssigned() called with no 
previous onPartitionsLost() nor onPartitionsRevoked()
 Key: KAFKA-15998
 URL: https://issues.apache.org/jira/browse/KAFKA-15998
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.4.0
Reporter: Jonathan Haapala


I ran into a case where {{onPartitionsAssigned()}} was called without first 
calling {{onPartitionsRevoked()}} and there is no indication that 
{{onPartitionsLost()}} was called or had any reason to be called. We are using 
the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0.

Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}:
{quote}In eager rebalancing, it will always be called at the start of a 
rebalance and after the consumer stops fetching data.{quote}
We internally keep track of partition states with a state machine, and rely on 
these APIs to assert what expected states we are in. So when a partition is 
Revoked and then re-Assigned, we know that we kept ownership. Moreover, if we 
are assigned partitions in EAGER rebalancing, we expect that entire assignment 
is passed to {{{}onPartitionsAssigned(){}}}, because if 
{{onPartitionsRevoked()}} is always called at the start of a rebalance and 
EAGER protocol always revokes the entire assignment, then by the time we hit 
{{onPartitionsAssigned()}} there should be nothing assigned from the consumer's 
point of view, and therefore the entire assignment is newly added.

However, we recently ran into a situation where we received an assignment while 
the consumer's existing assignment was non-empty:
|     *Pod*
|                                      *Message*
|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Notifying assignor about the {*}new 
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, 
topic-123, topic-130, topic-137, topic-141])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, 
topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
topic-141|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} 
[kafka-coordinator-heartbeat-thread \| metric-aggregator] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] *Request joining group* due to: group is already 
rebalancing|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Successfully joined group with generation 
Generation\{generationId=12417, 
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
 protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Successfully synced group in generation 
Generation{generationId={*}12417{*}, 
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
 protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Notifying assignor about the \{*}new 
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: |

Here you can see we get assigned partitions:

  26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141

And promptly see them all as newly added when passed to 
{{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices 
another rebalance and requests to join. It quickly succeeds and then almost 
immediately successfully syncs. We then get a new assignment:

  26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 

Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-12 Thread via GitHub


hachikuji commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1424500490


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -1352,6 +1360,28 @@ object GroupMetadataManager {
   "%X".format(BigInt(1, bytes))
   }
 
+  def maybeConvertError(error: Errors) : Errors = {
+error match {
+  case Errors.UNKNOWN_TOPIC_OR_PARTITION
+   | Errors.NOT_ENOUGH_REPLICAS
+   | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+Errors.COORDINATOR_NOT_AVAILABLE
+
+  case Errors.NOT_LEADER_OR_FOLLOWER
+   | Errors.KAFKA_STORAGE_ERROR =>
+Errors.NOT_COORDINATOR
+
+  case Errors.MESSAGE_TOO_LARGE
+   | Errors.RECORD_LIST_TOO_LARGE
+   | Errors.INVALID_FETCH_SIZE =>
+Errors.INVALID_COMMIT_OFFSET_SIZE

Review Comment:
   Right. I would think UNKNOWN_SERVER_ERROR or something like that would be 
more appropriate for a case like this. It doesn't say anything about the size 
of the offset commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-12 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1424497060


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -1352,6 +1360,28 @@ object GroupMetadataManager {
   "%X".format(BigInt(1, bytes))
   }
 
+  def maybeConvertError(error: Errors) : Errors = {
+error match {
+  case Errors.UNKNOWN_TOPIC_OR_PARTITION
+   | Errors.NOT_ENOUGH_REPLICAS
+   | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+Errors.COORDINATOR_NOT_AVAILABLE
+
+  case Errors.NOT_LEADER_OR_FOLLOWER
+   | Errors.KAFKA_STORAGE_ERROR =>
+Errors.NOT_COORDINATOR
+
+  case Errors.MESSAGE_TOO_LARGE
+   | Errors.RECORD_LIST_TOO_LARGE
+   | Errors.INVALID_FETCH_SIZE =>
+Errors.INVALID_COMMIT_OFFSET_SIZE

Review Comment:
   I think one is a known error to the group coordinator and the other isn't?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424473429


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) {
 return this;
 }
 
+public Builder withDefaultWriteTimeOut(Duration timeout) {

Review Comment:
   nit: `timeout` -> `defaultWriteTimeout`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -595,13 +609,15 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is applied to.
- * @param opThe write operation.
+ * @param name  The operation name.
+ * @param tpThe topic partition that the operation 
is applied to.
+ * @param defaultWriteTimeout   The default write operation timeout
+ * @param opThe write operation.
  */
 CoordinatorWriteEvent(
 String name,
 TopicPartition tp,
+Duration defaultWriteTimeout,

Review Comment:
   ditto.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+final Duration defaultWriteTimeout;

Review Comment:
   nit: This one should actually be `writeTimeout`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation

Review Comment:
   nit: `.`.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
+MockTimer timer = new MockTimer();
+// The partition writer only accept on write.
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0, ctx.coordinator.lastWrittenOffset());
+assertEquals(0, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Write #1. We should get a TimeoutException because the HWM will not 
advance.
+CompletableFuture timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+
+timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() * 2);

Review Comment:
   nit: I would just do `timeout + 1` if possible.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -610,26 +626,29 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 null,
 RecordBatch.NO_PRODUCER_ID,
 RecordBatch.NO_PRODUCER_EPOCH,
+defaultWriteTimeout,
 op
 );
 }
 
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is 
applied to.
- * @param 

Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-12 Thread via GitHub


hachikuji commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1424481055


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -1352,6 +1360,28 @@ object GroupMetadataManager {
   "%X".format(BigInt(1, bytes))
   }
 
+  def maybeConvertError(error: Errors) : Errors = {
+error match {
+  case Errors.UNKNOWN_TOPIC_OR_PARTITION
+   | Errors.NOT_ENOUGH_REPLICAS
+   | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+Errors.COORDINATOR_NOT_AVAILABLE
+
+  case Errors.NOT_LEADER_OR_FOLLOWER
+   | Errors.KAFKA_STORAGE_ERROR =>
+Errors.NOT_COORDINATOR
+
+  case Errors.MESSAGE_TOO_LARGE
+   | Errors.RECORD_LIST_TOO_LARGE
+   | Errors.INVALID_FETCH_SIZE =>
+Errors.INVALID_COMMIT_OFFSET_SIZE

Review Comment:
   We don't have to address here, but this error list handling seems dubious. 
Not sure why `INVALID_FETCH_SIZE` would translate to 
`INVALID_COMMIT_OFFSET_SIZE`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2023-12-12 Thread via GitHub


tinaselenge opened a new pull request, #14995:
URL: https://github.com/apache/kafka/pull/14995

   This PR implements 
[KIP-993](https://cwiki.apache.org/confluence/display/KAFKA/KIP-993%3A+Allow+restricting+files+accessed+by+File+and+Directory+ConfigProviders)
 for restricting files accessed by File and Directory ConfigProviders. 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-12 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1424413335


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
 }
 
 private static class Block {

Review Comment:
   can you make this public to allow OffsetsApiIntegrationTest to use the latch?
   
   and do you think that maybe these connectors should be moved out of this 
test to a common reusable class?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");

Review Comment:
   Is this an opportunity for a flaky failure, if the test thread advances 
before the connector is created. It seems very rare, I don't see any instances 
on the Gradle dashboard.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   
   Since scanning creates connector instances, and validation caches the 
connector instance, how do you ensure that the right awaitBlockLatch is being 
waited on here?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -139,7 +141,8 @@ public void setup() throws Exception {
 public void close() {
 // stop all Connect, Kafka and Zk threads.
 connect.stop();
-Block.resetBlockLatch();
+// unblock everything so that we don't leak threads after each test run
+Block.reset();

Review Comment:
   WDYT about resetting before stopping the workers, to allow a normal shutdown 
to happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]

2023-12-12 Thread via GitHub


C0urante commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1424453814


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest {
 private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
 private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
 private static final int NUM_WORKERS = 3;
-private static final String CONNECTOR_NAME = "test-connector";
-private static final String TOPIC = "test-topic";
 private static final int NUM_TASKS = 2;
 private static final int NUM_RECORDS_PER_PARTITION = 10;
-private Map workerProps;
-private EmbeddedConnectCluster.Builder connectBuilder;
+private static final Map, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+@Rule
+public TestName currentTest = new TestName();
 private EmbeddedConnectCluster connect;
+private String connectorName;
+private String topic;
 
 @Before
 public void setup() {
-Properties brokerProps = new Properties();
-brokerProps.put("transaction.state.log.replication.factor", "1");
-brokerProps.put("transaction.state.log.min.isr", "1");
-
-// setup Connect worker properties
-workerProps = new HashMap<>();
-workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-// build a Connect cluster backed by Kafka and Zk
-connectBuilder = new EmbeddedConnectCluster.Builder()
-.name("connect-cluster")
-.numWorkers(NUM_WORKERS)
-.brokerProps(brokerProps)
-.workerProps(workerProps);
+connectorName = currentTest.getMethodName();
+topic = currentTest.getMethodName();
+connect = defaultConnectCluster();
 }
 
 @After
 public void tearDown() {
-connect.stop();
+Set remainingConnectors = new HashSet<>(connect.connectors());
+if (remainingConnectors.remove(connectorName)) {
+connect.deleteConnector(connectorName);
+}
+try {

Review Comment:
   Ah, got it. Yeah, the idea behind the assertion was to catch any stray 
connectors that may be left behind by future tests. It's pointless now but it 
may be a nice guardrail later on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]

2023-12-12 Thread via GitHub


sudeshwasnik commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1424446893


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest {
 private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
 private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
 private static final int NUM_WORKERS = 3;
-private static final String CONNECTOR_NAME = "test-connector";
-private static final String TOPIC = "test-topic";
 private static final int NUM_TASKS = 2;
 private static final int NUM_RECORDS_PER_PARTITION = 10;
-private Map workerProps;
-private EmbeddedConnectCluster.Builder connectBuilder;
+private static final Map, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+@Rule
+public TestName currentTest = new TestName();
 private EmbeddedConnectCluster connect;
+private String connectorName;
+private String topic;
 
 @Before
 public void setup() {
-Properties brokerProps = new Properties();
-brokerProps.put("transaction.state.log.replication.factor", "1");
-brokerProps.put("transaction.state.log.min.isr", "1");
-
-// setup Connect worker properties
-workerProps = new HashMap<>();
-workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-// build a Connect cluster backed by Kafka and Zk
-connectBuilder = new EmbeddedConnectCluster.Builder()
-.name("connect-cluster")
-.numWorkers(NUM_WORKERS)
-.brokerProps(brokerProps)
-.workerProps(workerProps);
+connectorName = currentTest.getMethodName();
+topic = currentTest.getMethodName();
+connect = defaultConnectCluster();
 }
 
 @After
 public void tearDown() {
-connect.stop();
+Set remainingConnectors = new HashSet<>(connect.connectors());
+if (remainingConnectors.remove(connectorName)) {
+connect.deleteConnector(connectorName);
+}
+try {

Review Comment:
   sounds good ! thanks ! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]

2023-12-12 Thread via GitHub


sudeshwasnik commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r142423


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest {
 private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
 private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
 private static final int NUM_WORKERS = 3;
-private static final String CONNECTOR_NAME = "test-connector";
-private static final String TOPIC = "test-topic";
 private static final int NUM_TASKS = 2;
 private static final int NUM_RECORDS_PER_PARTITION = 10;
-private Map workerProps;
-private EmbeddedConnectCluster.Builder connectBuilder;
+private static final Map, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+@Rule
+public TestName currentTest = new TestName();
 private EmbeddedConnectCluster connect;
+private String connectorName;
+private String topic;
 
 @Before
 public void setup() {
-Properties brokerProps = new Properties();
-brokerProps.put("transaction.state.log.replication.factor", "1");
-brokerProps.put("transaction.state.log.min.isr", "1");
-
-// setup Connect worker properties
-workerProps = new HashMap<>();
-workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-// build a Connect cluster backed by Kafka and Zk
-connectBuilder = new EmbeddedConnectCluster.Builder()
-.name("connect-cluster")
-.numWorkers(NUM_WORKERS)
-.brokerProps(brokerProps)
-.workerProps(workerProps);
+connectorName = currentTest.getMethodName();
+topic = currentTest.getMethodName();
+connect = defaultConnectCluster();
 }
 
 @After
 public void tearDown() {
-connect.stop();
+Set remainingConnectors = new HashSet<>(connect.connectors());
+if (remainingConnectors.remove(connectorName)) {
+connect.deleteConnector(connectorName);
+}
+try {

Review Comment:
   line 102 -> `remainingConnectors.remove(connectorName)` will remove elements 
anyways, so line-106's assertEquals will always be true - so the assertion is 
pointless. Which I thought wasn't okayy. 
   
   but your point is valid too! feel free to resolve this comment
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]

2023-12-12 Thread via GitHub


sudeshwasnik commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r142423


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest {
 private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
 private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
 private static final int NUM_WORKERS = 3;
-private static final String CONNECTOR_NAME = "test-connector";
-private static final String TOPIC = "test-topic";
 private static final int NUM_TASKS = 2;
 private static final int NUM_RECORDS_PER_PARTITION = 10;
-private Map workerProps;
-private EmbeddedConnectCluster.Builder connectBuilder;
+private static final Map, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+@Rule
+public TestName currentTest = new TestName();
 private EmbeddedConnectCluster connect;
+private String connectorName;
+private String topic;
 
 @Before
 public void setup() {
-Properties brokerProps = new Properties();
-brokerProps.put("transaction.state.log.replication.factor", "1");
-brokerProps.put("transaction.state.log.min.isr", "1");
-
-// setup Connect worker properties
-workerProps = new HashMap<>();
-workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-// build a Connect cluster backed by Kafka and Zk
-connectBuilder = new EmbeddedConnectCluster.Builder()
-.name("connect-cluster")
-.numWorkers(NUM_WORKERS)
-.brokerProps(brokerProps)
-.workerProps(workerProps);
+connectorName = currentTest.getMethodName();
+topic = currentTest.getMethodName();
+connect = defaultConnectCluster();
 }
 
 @After
 public void tearDown() {
-connect.stop();
+Set remainingConnectors = new HashSet<>(connect.connectors());
+if (remainingConnectors.remove(connectorName)) {
+connect.deleteConnector(connectorName);
+}
+try {

Review Comment:
   line 102 -> `remainingConnectors.remove(connectorName)` will remove elements 
anyways, so line-106's assertEquals will always be true - so the assertion is 
pointless. Which I thought wasn't okayy. 
   
   but your point is valid too! feel free to resolve this comment
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add entity type to topic names in ConsumerGroupDescribeResponse.json [kafka]

2023-12-12 Thread via GitHub


dajac commented on PR #14986:
URL: https://github.com/apache/kafka/pull/14986#issuecomment-1852606155

   Merged to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add entity type to topic names in ConsumerGroupDescribeResponse.json [kafka]

2023-12-12 Thread via GitHub


dajac merged PR #14986:
URL: https://github.com/apache/kafka/pull/14986


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424433876


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer, final 
AtomicReference firstException) {
+if (!groupMetadata.isPresent())
+return;
+maybeAutoCommitSync(timer, firstException);
+timer.update();
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 
timer.remainingMs()), timer, firstException);
+maybeInvokeCommitCallbacks();
+maybeRevokePartitions(timer, firstException);
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 
timer.remainingMs()), timer, firstException);
+}
+
+private void waitOnEventCompletion(final ConsumerCloseApplicationEvent 
event,
+   final Timer timer,
+   final AtomicReference 
firstException) {
+try {
+applicationEventHandler.addAndGet(event, timer);
+} catch (TimeoutException e) {

Review Comment:
   @kirktrue and I discussed the potential tasks for dealing with zero timeout. 
 This needs to be examined perhaps after the preview. So we will spin off a 
jira ticket for this specific issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424414983


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {
 // These are the optional outgoing requests at the
-List pollResults = 
requestManagers.stream()
+requestManagers.stream()

Review Comment:
   This is not a conflict actually - this is just some changes to how fetch 
request manager closes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424413992


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+prepareShutdown(closeTimer, firstException);
+closeTimer.update();
 if (applicationEventHandler != null)
-closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed to close application event handler with a timeout(ms)=" + 
closeTimer.remainingMs(), firstException);
-
-// Invoke all callbacks after the background thread exists in case if 
there are unsent async
-// commits
-maybeInvokeCommitCallbacks();
-
-closeQuietly(fetchBuffer, "Failed to close the fetch buffer", 
firstException);
+closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
+closeTimer.update();
+// Ensure all async commit callbacks are invoked

Review Comment:
   might not even need this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]

2023-12-12 Thread via GitHub


MikeEdgar commented on PR #14599:
URL: https://github.com/apache/kafka/pull/14599#issuecomment-1852564791

   Thanks for the feedback @jolshan . If I'm following your comments on the 
broken test, the assertion
   ```scala
   assertThrows(classOf[ExecutionException], () => 
results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException]
 
   ```
   must be replaced with:
   ```scala
   assertFutureExceptionTypeEquals(results.get(nonExistingTopicId), 
classOf[UnknownTopicIdException])
   ```
   
   Correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]

2023-12-12 Thread via GitHub


vamossagar12 commented on PR #14963:
URL: https://github.com/apache/kafka/pull/14963#issuecomment-1852550354

   Thanks for the changes @Joker-5 . I will take a look this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]

2023-12-12 Thread via GitHub


jolshan commented on PR #14599:
URL: https://github.com/apache/kafka/pull/14599#issuecomment-1852550819

   I synced with @ijuma offline. I think it makes sense to return the 
UnknownTopicId exception since that is what we do for the deleteTopics api and 
what the server is using. 
   
   It is a bit annoying that we can't use the topicError directly and we 
convert to the cluster object that loses all the detail about the topic IDs and 
their error responses. But fixing that requires a larger refactor,
   
   For now let's just fix 
https://github.com/apache/kafka/blob/2a5fbf28820ddcde5ead605e070391059d5d2e18/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala#L200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]

2023-12-12 Thread via GitHub


vamossagar12 commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1424389303


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest {
 private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
 private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
 private static final int NUM_WORKERS = 3;
-private static final String CONNECTOR_NAME = "test-connector";
-private static final String TOPIC = "test-topic";
 private static final int NUM_TASKS = 2;
 private static final int NUM_RECORDS_PER_PARTITION = 10;
-private Map workerProps;
-private EmbeddedConnectCluster.Builder connectBuilder;
+private static final Map, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+@Rule
+public TestName currentTest = new TestName();
 private EmbeddedConnectCluster connect;
+private String connectorName;
+private String topic;
 
 @Before
 public void setup() {
-Properties brokerProps = new Properties();
-brokerProps.put("transaction.state.log.replication.factor", "1");
-brokerProps.put("transaction.state.log.min.isr", "1");
-
-// setup Connect worker properties
-workerProps = new HashMap<>();
-workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-// build a Connect cluster backed by Kafka and Zk
-connectBuilder = new EmbeddedConnectCluster.Builder()
-.name("connect-cluster")
-.numWorkers(NUM_WORKERS)
-.brokerProps(brokerProps)
-.workerProps(workerProps);
+connectorName = currentTest.getMethodName();
+topic = currentTest.getMethodName();
+connect = defaultConnectCluster();
 }
 
 @After
 public void tearDown() {
-connect.stop();
+Set remainingConnectors = new HashSet<>(connect.connectors());
+if (remainingConnectors.remove(connectorName)) {
+connect.deleteConnector(connectorName);
+}
+try {
+assertEquals(
+"Some connectors were not properly cleaned up after this 
test",
+Collections.emptySet(),
+remainingConnectors
+);
+} finally {
+// Make a last-ditch effort to clean up the leaked connectors
+// so as not to interfere with other test cases
+remainingConnectors.forEach(connect::deleteConnector);
+}
+}
+
+@AfterClass
+public static void close() {
+// stop all Connect, Kafka and Zk threads.
+CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
+}
+
+private static EmbeddedConnectCluster 
createOrReuseConnectWithWorkerProps(Map workerProps) {
+return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> {
+Properties brokerProps = new Properties();
+brokerProps.put("transaction.state.log.replication.factor", "1");
+brokerProps.put("transaction.state.log.min.isr", "1");
+
+// Have to declare a new map since the passed-in one may be 
immutable
+Map workerPropsWithDefaults = new 
HashMap<>(workerProps);
+// Enable fast offset commits by default
+
workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+
+EmbeddedConnectCluster result = new 
EmbeddedConnectCluster.Builder()

Review Comment:
   Ok, I saw `connect` in `ConnectRestartAPI`  being used, that's why thought 
we could use the same name. We can retain this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]

2023-12-12 Thread via GitHub


vamossagar12 commented on PR #14963:
URL: https://github.com/apache/kafka/pull/14963#issuecomment-1852542077

   hmm, the JDK21 build failed with this error =>
   ```
   > Task :examples:spotbugsMain
   
   Cannot contact jenkins-shared-ubuntu-3: java.lang.InterruptedException
   
   > Task :core:compileScala
   
   > Task :group-coordinator:classes
   
   > Task :metadata:classes
   
   > Task :examples:spotbugsTest SKIPPED
   
   > Task :examples:check
   
   > Task :group-coordinator:checkstyleMain
   
   > Task :clients:testClasses
   
   > Task :clients:checkstyleTest
   
   > Task :clients:spotbugsMain
   
   
   
   The message received from the daemon indicates that the daemon has 
disappeared.
   
   Build request sent: Build{id=f24da9a8-313c-42d5-bc18-9f578f44c4a3, 
currentDir=/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-14963}
   
   Attempting to read last messages from the daemon log...
   
   Daemon pid: 2004981
   
 log file: /home/jenkins/.gradle/daemon/8.5/daemon-2004981.out.log
   
   - Last  20 lines from daemon log file - daemon-2004981.out.log -
   
   2023-12-11T15:25:02.911+ [DEBUG] 
[org.gradle.launcher.daemon.server.SynchronizedDispatchConnection] thread 25: 
received class org.gradle.launcher.daemon.protocol.CloseInput
   
   2023-12-11T15:25:02.911+ [DEBUG] 
[org.gradle.launcher.daemon.server.DefaultDaemonConnection] thread 25: Received 
IO message from client: org.gradle.launcher.daemon.protocol.CloseInput@41f596f2
   
   2023-12-11T15:25:02.945+ [DEBUG] 
[org.gradle.launcher.daemon.server.exec.RequestStopIfSingleUsedDaemon] 
Requesting daemon stop after processing 
Build{id=f24da9a8-313c-42d5-bc18-9f578f44c4a3, 
currentDir=/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-14963}
   
   2023-12-11T15:25:02.965+ [LIFECYCLE] 
[org.gradle.launcher.daemon.server.DaemonStateCoordinator] Daemon will be 
stopped at the end of the build 
   
   2023-12-11T15:25:02.968+ [DEBUG] 
[org.gradle.launcher.daemon.server.DaemonStateCoordinator] Stop as soon as idle 
requested. The daemon is busy: true
   
   2023-12-11T15:25:02.969+ [DEBUG] 
[org.gradle.launcher.daemon.server.DaemonStateCoordinator] daemon stop has been 
requested. Sleeping until state changes.
   
   2023-12-11T15:25:02.970+ [DEBUG] 
[org.gradle.launcher.daemon.server.exec.ExecuteBuild] The daemon has started 
executing the build.
   
   2023-12-11T15:25:02.971+ [DEBUG] 
[org.gradle.launcher.daemon.server.exec.ExecuteBuild] Executing build with 
daemon context: 
DefaultDaemonContext[uid=c8e3ee8e-33ef-46b7-a5a1-3d91a0e54ca4,javaHome=/usr/local/asfpackages/java/adoptium-jdk-21.0.1+12,daemonRegistryDir=/home/jenkins/.gradle/daemon,pid=2004981,idleTimeout=12,priority=NORMAL,applyInstrumentationAgent=true,daemonOpts=-Xss4m,-XX:+UseParallelGC,--add-opens=java.base/java.util=ALL-UNNAMED,--add-opens=java.base/java.lang=ALL-UNNAMED,--add-opens=java.base/java.lang.invoke=ALL-UNNAMED,--add-opens=java.prefs/java.util.prefs=ALL-UNNAMED,--add-opens=java.base/java.nio.charset=ALL-UNNAMED,--add-opens=java.base/java.net=ALL-UNNAMED,--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED,-Xmx2g,-Dfile.encoding=UTF-8,-Duser.country,-Duser.language=en,-Duser.variant]
   
   2023-12-11T15:25:02.972+ [INFO] 
[org.gradle.launcher.daemon.server.exec.ForwardClientInput] Closing daemon's 
stdin at end of input.
   
   2023-12-11T15:25:02.973+ [INFO] 
[org.gradle.launcher.daemon.server.exec.ForwardClientInput] The daemon will no 
longer process any standard input.
   
   Starting build with version 3.7.0-SNAPSHOT (commit id 22d2b462) using Gradle 
8.5, Java 21 and Scala 2.13.12
   
   Build properties: maxParallelForks=4, maxScalacThreads=4, maxTestRetries=0
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


vamossagar12 commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424381464


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -719,20 +721,26 @@ public void run() {
 result.records()
 );
 
-timer.add(new TimerTask(timeout) {
+timer.add(new TimerTask(timeout.toMillis()) {
 @Override
 public void run() {
-scheduleInternalOperation(name, tp, () -> 
complete(new TimeoutException("Writing records to the log timed out")));
+if (!future.isDone()) {
+scheduleInternalOperation(
+"LogAppendEvent(name=" + name + ", 
tp=" + tp + ")",
+tp,
+() -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))
+);
+}
 }
 });
 
-
context.coordinator.updateLastWrittenOffset(offset);
-
 // Add the response to the deferred queue.
 if (!future.isDone()) {
 context.deferredEventQueue.add(offset, this);
+
context.coordinator.updateLastWrittenOffset(offset);

Review Comment:
   Ok. yes, that was a mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]

2023-12-12 Thread via GitHub


kamalcph commented on code in PR #14832:
URL: https://github.com/apache/kafka/pull/14832#discussion_r1424357202


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -341,9 +341,10 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 
 leaderPartitions.forEach(this::cacheTopicPartitionIds);
 followerPartitions.forEach(this::cacheTopicPartitionIds);
+followerPartitions.forEach(
+topicIdPartition -> 
brokerTopicStats.topicStats(topicIdPartition.topic()).removeRemoteCopyBytesLag(topicIdPartition.partition()));
 
-
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, 
followerPartitions);
-followerPartitions.forEach(topicIdPartition ->
+
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, 
followerPartitions);followerPartitions.forEach(topicIdPartition ->

Review Comment:
   please revert this change. Two statements in the same line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424354426


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesout() throws 
InterruptedException {
+MockTimer timer = new MockTimer();
+// The partition writer only accept on write.
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0, ctx.coordinator.lastWrittenOffset());
+assertEquals(0, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Write #1. We should get a TimeoutException because the HWM will not 
advance

Review Comment:
   nit: `.` at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424353506


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesout() throws 
InterruptedException {

Review Comment:
   nit: `TimesOut`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424352569


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -186,6 +187,7 @@ public void testConsumerGroupHeartbeat() throws 
ExecutionException, InterruptedE
 when(runtime.scheduleWriteOperation(
 ArgumentMatchers.eq("consumer-group-heartbeat"),
 ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.any(),

Review Comment:
   Could we actually put the expected value for all of those?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351506


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1092,6 +1134,7 @@ private CoordinatorRuntime(
 CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier,
 Time time,
 Timer timer,
+Duration timeout,

Review Comment:
   nit `defaultWriteTimeout`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350048


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+
+final Duration timeout;

Review Comment:
   Should it be `defaultWriteTimeout` too?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is applied to.
- * @param opThe write operation.
+ * @param name  The operation name.
+ * @param tpThe topic partition that the operation is applied 
to.
+ * @param timeout   The write operation timeout
+ * @param opThe write operation.
  */
 CoordinatorWriteEvent(
 String name,
 TopicPartition tp,
+Duration timeout,

Review Comment:
   Same question about the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351314


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1019,6 +1055,11 @@ public void onHighWatermarkUpdated(
  */
 private final Timer timer;
 
+/**
+ * The write operation timeout
+ */
+private final Duration timeout;

Review Comment:
   nit: `defaultWriteTimeout`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350465


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -698,13 +720,27 @@ public void run() {
 producerEpoch,
 result.records()
 );
-
context.coordinator.updateLastWrittenOffset(offset);
+
+timer.add(new TimerTask(timeout.toMillis()) {
+@Override
+public void run() {
+if (!future.isDone()) {
+scheduleInternalOperation(
+"LogAppendEvent(name=" + name + ", 
tp=" + tp + ")",
+tp,
+() -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))
+);
+}
+}
+});
 
 // Add the response to the deferred queue.
 if (!future.isDone()) {
 context.deferredEventQueue.add(offset, this);
+
context.coordinator.updateLastWrittenOffset(offset);
 } else {
 complete(null);
+

Review Comment:
   nit: We could remove this empty line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350233


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is applied to.
- * @param opThe write operation.
+ * @param name  The operation name.
+ * @param tpThe topic partition that the operation is applied 
to.
+ * @param timeout   The write operation timeout
+ * @param opThe write operation.
  */
 CoordinatorWriteEvent(
 String name,
 TopicPartition tp,
+Duration timeout,

Review Comment:
   Same question about the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349399


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) {
 return this;
 }
 
+public Builder withWriteTimeOut(Duration timeout) {

Review Comment:
   nit: `withDefaultWriteTimeout`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349887


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+

Review Comment:
   nit: Remove empty line.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+
+final Duration timeout;

Review Comment:
   Should it be `defaultWriteTimeout` too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424345912


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -719,20 +721,26 @@ public void run() {
 result.records()
 );
 
-timer.add(new TimerTask(timeout) {
+timer.add(new TimerTask(timeout.toMillis()) {
 @Override
 public void run() {
-scheduleInternalOperation(name, tp, () -> 
complete(new TimeoutException("Writing records to the log timed out")));
+if (!future.isDone()) {
+scheduleInternalOperation(
+"LogAppendEvent(name=" + name + ", 
tp=" + tp + ")",
+tp,
+() -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))
+);
+}
 }
 });
 
-
context.coordinator.updateLastWrittenOffset(offset);
-
 // Add the response to the deferred queue.
 if (!future.isDone()) {
 context.deferredEventQueue.add(offset, this);
+
context.coordinator.updateLastWrittenOffset(offset);

Review Comment:
   Why did you move this? I think that it should stay where it was and the 
timer must be added here, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9545.

Resolution: Fixed

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15997) Ensure fairness in the uniform assignor

2023-12-12 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-15997:
---

Assignee: Ritika Reddy

> Ensure fairness in the uniform assignor
> ---
>
> Key: KAFKA-15997
> URL: https://issues.apache.org/jira/browse/KAFKA-15997
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Emanuele Sabellico
>Assignee: Ritika Reddy
>Priority: Minor
>
>  
>  
> Fairness has to be ensured in uniform assignor as it was in 
> cooperative-sticky one.
> There's this test 0113 subtest u_multiple_subscription_changes in librdkafka 
> where 8 consumers are subscribing to the same topic, and it's verifying that 
> all of them are getting 2 partitions assigned. But with new protocol it seems 
> two consumers get assigned 3 partitions and 1 has zero partitions. The test 
> doesn't configure any client.rack.
> {code:java}
> [0113_cooperative_rebalance  /478.183s] Consumer assignments 
> (subscription_variation 0) (stabilized) (no rebalance cb):
> [0113_cooperative_rebalance  /478.183s] Consumer C_0#consumer-3 assignment 
> (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [5] (2000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [8] (4000msgs)
> [0113_cooperative_rebalance  /478.183s] Consumer C_1#consumer-4 assignment 
> (3): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [0] (1000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [3] (2000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [13] (1000msgs)
> [0113_cooperative_rebalance  /478.184s] Consumer C_2#consumer-5 assignment 
> (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [6] (1000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [10] (2000msgs)
> [0113_cooperative_rebalance  /478.184s] Consumer C_3#consumer-6 assignment 
> (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [7] (1000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [9] (2000msgs)
> [0113_cooperative_rebalance  /478.184s] Consumer C_4#consumer-7 assignment 
> (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [11] (1000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [14] (3000msgs)
> [0113_cooperative_rebalance  /478.184s] Consumer C_5#consumer-8 assignment 
> (3): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [1] (2000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [2] (2000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [4] (1000msgs)
> [0113_cooperative_rebalance  /478.184s] Consumer C_6#consumer-9 assignment 
> (0): 
> [0113_cooperative_rebalance  /478.184s] Consumer C_7#consumer-10 assignment 
> (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [12] (1000msgs), 
> rdkafkatest_rnd24419cc75e59d8de_0113u_1 [15] (1000msgs)
> [0113_cooperative_rebalance  /478.184s] 16/32 partitions assigned
> [0113_cooperative_rebalance  /478.184s] Consumer C_0#consumer-3 has 2 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [0113_cooperative_rebalance  /478.184s] Consumer C_1#consumer-4 has 3 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [0113_cooperative_rebalance  /478.184s] Consumer C_2#consumer-5 has 2 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [0113_cooperative_rebalance  /478.184s] Consumer C_3#consumer-6 has 2 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [0113_cooperative_rebalance  /478.184s] Consumer C_4#consumer-7 has 2 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [0113_cooperative_rebalance  /478.184s] Consumer C_5#consumer-8 has 3 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [0113_cooperative_rebalance  /478.184s] Consumer C_6#consumer-9 has 0 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [0113_cooperative_rebalance  /478.184s] Consumer C_7#consumer-10 has 2 
> assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions
> [                      /479.057s] 1 test(s) running: 
> 0113_cooperative_rebalance
> [                      /480.057s] 1 test(s) running: 
> 0113_cooperative_rebalance
> [                      /481.057s] 1 test(s) running: 
> 0113_cooperative_rebalance
> [0113_cooperative_rebalance  /482.498s] TEST FAILURE
> ### Test "0113_cooperative_rebalance (u_multiple_subscription_changes:2390: 
> use_rebalance_cb: 0, subscription_variation: 0)" failed at 
> test.c:1243:check_test_timeouts() at Thu Dec  7 15:52:15 2023: ###
> Test 0113_cooperative_rebalance (u_multiple_subscription_changes:2390: 
> use_rebalance_cb: 0, subscription_variation: 0) timed out (timeout set to 480 
> seconds)
> ./run-test.sh: line 62: 3512920 Killed                  $TEST $ARGS
> ###
> ### Test ./test-runner in bare mode FAILED! (return code 137) ###
> ###{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15997) Ensure fairness in the uniform assignor

2023-12-12 Thread Emanuele Sabellico (Jira)
Emanuele Sabellico created KAFKA-15997:
--

 Summary: Ensure fairness in the uniform assignor
 Key: KAFKA-15997
 URL: https://issues.apache.org/jira/browse/KAFKA-15997
 Project: Kafka
  Issue Type: Sub-task
Reporter: Emanuele Sabellico


 

 

Fairness has to be ensured in uniform assignor as it was in cooperative-sticky 
one.

There's this test 0113 subtest u_multiple_subscription_changes in librdkafka 
where 8 consumers are subscribing to the same topic, and it's verifying that 
all of them are getting 2 partitions assigned. But with new protocol it seems 
two consumers get assigned 3 partitions and 1 has zero partitions. The test 
doesn't configure any client.rack.


{code:java}
[0113_cooperative_rebalance  /478.183s] Consumer assignments 
(subscription_variation 0) (stabilized) (no rebalance cb):
[0113_cooperative_rebalance  /478.183s] Consumer C_0#consumer-3 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [5] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [8] (4000msgs)
[0113_cooperative_rebalance  /478.183s] Consumer C_1#consumer-4 assignment (3): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [0] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [3] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [13] (1000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_2#consumer-5 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [6] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [10] (2000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_3#consumer-6 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [7] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [9] (2000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_4#consumer-7 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [11] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [14] (3000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_5#consumer-8 assignment (3): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [1] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [2] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [4] (1000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_6#consumer-9 assignment (0): 
[0113_cooperative_rebalance  /478.184s] Consumer C_7#consumer-10 assignment 
(2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [12] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [15] (1000msgs)
[0113_cooperative_rebalance  /478.184s] 16/32 partitions assigned
[0113_cooperative_rebalance  /478.184s] Consumer C_0#consumer-3 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_1#consumer-4 has 3 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_2#consumer-5 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_3#consumer-6 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_4#consumer-7 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_5#consumer-8 has 3 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_6#consumer-9 has 0 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_7#consumer-10 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[                      /479.057s] 1 test(s) running: 
0113_cooperative_rebalance
[                      /480.057s] 1 test(s) running: 
0113_cooperative_rebalance
[                      /481.057s] 1 test(s) running: 
0113_cooperative_rebalance
[0113_cooperative_rebalance  /482.498s] TEST FAILURE
### Test "0113_cooperative_rebalance (u_multiple_subscription_changes:2390: 
use_rebalance_cb: 0, subscription_variation: 0)" failed at 
test.c:1243:check_test_timeouts() at Thu Dec  7 15:52:15 2023: ###
Test 0113_cooperative_rebalance (u_multiple_subscription_changes:2390: 
use_rebalance_cb: 0, subscription_variation: 0) timed out (timeout set to 480 
seconds)
./run-test.sh: line 62: 3512920 Killed                  $TEST $ARGS
###
### Test ./test-runner in bare mode FAILED! (return code 137) ###
###{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-12 Thread via GitHub


lianetm commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1424320386


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -169,7 +169,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   startingTimestamp = startingTimestamp)

Review Comment:
   FYI, the max poll interval PR has been merged, so I guess we can enable now 
the existing callback integration tests that depended on that one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics [kafka]

2023-12-12 Thread via GitHub


jeffkbkim commented on code in PR #14848:
URL: https://github.com/apache/kafka/pull/14848#discussion_r1424319049


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -77,29 +70,13 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
 
 private final MetricsRegistry registry;
 private final Metrics metrics;
-private final Map shards = new 
HashMap<>();
-private static final AtomicLong 
NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER = new AtomicLong(0);
-private static final AtomicLong 
NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER = new AtomicLong(0);
-private static final AtomicLong NUM_GENERIC_GROUPS_STABLE_COUNTER = new 
AtomicLong(0);
-private static final AtomicLong NUM_GENERIC_GROUPS_DEAD_COUNTER = new 
AtomicLong(0);
-private static final AtomicLong NUM_GENERIC_GROUPS_EMPTY_COUNTER = new 
AtomicLong(0);
+private final Map shards = 
new ConcurrentHashMap<>();

Review Comment:
   we may have concurrent modification when the metric thread scrapes a shard 
while the shard is removed from `shards` on deactivate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-12 Thread via GitHub


mjsax commented on PR #14936:
URL: https://github.com/apache/kafka/pull/14936#issuecomment-1852434603

   @stanislavkozlovski There is no 3.7 branch yet, so I merged this one. Must 
go into 3.7 release (is ready for days, but Jenkins did not cooperate...) -- If 
your cut does not include it, I'll cherry-pick to 3.7 later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-12 Thread via GitHub


mjsax merged PR #14936:
URL: https://github.com/apache/kafka/pull/14936


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-12 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1424297697


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -790,6 +812,197 @@ public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
 verify(membershipManager, never()).transitionToJoining();
 }
 
+@Test
+public void testListenerCallbacksBasic() {

Review Comment:
   I've updated the `MembershipManagerImplTest` to include these latest 
changes. Thanks for clarifying the desired behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15366: Modify LogDirFailureTest for KRaft [kafka]

2023-12-12 Thread via GitHub


soarez commented on code in PR #14977:
URL: https://github.com/apache/kafka/pull/14977#discussion_r1424296842


##
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala:
##
@@ -191,12 +204,27 @@ class LogDirFailureTest extends IntegrationTestHarness {
 TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
 
 // There should be no remaining LogDirEventNotification znode
-assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
+if (quorum == "zk") {
+  assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
+}
 
-// The controller should have marked the replica on the original leader as 
offline
-val controllerServer = servers.find(_.kafkaController.isActive).get
-val offlineReplicas = 
controllerServer.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-assertTrue(offlineReplicas.contains(PartitionAndReplica(new 
TopicPartition(topic, 0), leaderServerId)))
+if (quorum == "kraft") {
+  waitUntilTrue(() => {
+brokers.exists(broker => {
+  val hasOfflineDir = 
broker.asInstanceOf[BrokerServer].logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString)
+  hasOfflineDir && broker.asInstanceOf[BrokerServer]
+.replicaManager
+.metadataCache
+.getClusterMetadata(broker.clusterId, 
broker.config.interBrokerListenerName)
+.partition(new TopicPartition(topic, 
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId)

Review Comment:
   Hi @viktorsomogyi 
   
   `ReplicaManager.maybeUpdateTopicAssignment` sends `AssignReplicasToDirs` as 
necessary after a partition is created. PR #14982 fixed an integration test — 
`kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs(String).quorum=kraft`
 — which relies on `AssignReplicasToDirs` being sent. So I'd expect that part 
to be working.
   
   It could also be be that the log directory failure somehow isn't being 
propagated (via BrokerHeartbeatRequest), or that the metadata cache in the 
broker isn't being updated. To troubleshoot, I suggest verifying the following 
via logging/breakpoints:
   
   1. Check that the assignment is being sent in 
`AssignmentsManager.onAssignment`
   2. Check that the controller is receiving the assignment in 
`ReplicationControlManager.handleAssignReplicasToDirs`
   3. Check that the directory failure is being propagated by the broker in 
`BrokerLifecycleManager.propagateDirectoryFailure`
   4. Check that the controller is processing the directory failure in 
`ReplicationControlManager.handleDirectoriesOffline`
   5. If all the above is happening, then there must be some issue in the up 
catching with metadata by the broker.
   



##
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala:
##
@@ -191,12 +204,27 @@ class LogDirFailureTest extends IntegrationTestHarness {
 TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
 
 // There should be no remaining LogDirEventNotification znode
-assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
+if (quorum == "zk") {
+  assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
+}
 
-// The controller should have marked the replica on the original leader as 
offline
-val controllerServer = servers.find(_.kafkaController.isActive).get
-val offlineReplicas = 
controllerServer.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-assertTrue(offlineReplicas.contains(PartitionAndReplica(new 
TopicPartition(topic, 0), leaderServerId)))
+if (quorum == "kraft") {
+  waitUntilTrue(() => {
+brokers.exists(broker => {
+  val hasOfflineDir = 
broker.asInstanceOf[BrokerServer].logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString)
+  hasOfflineDir && broker.asInstanceOf[BrokerServer]
+.replicaManager
+.metadataCache
+.getClusterMetadata(broker.clusterId, 
broker.config.interBrokerListenerName)
+.partition(new TopicPartition(topic, 
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId)

Review Comment:
   Hi @viktorsomogyi 
   
   `ReplicaManager.maybeUpdateTopicAssignment` sends `AssignReplicasToDirs` as 
necessary after a partition is created. PR #14982 fixed an integration test — 
`kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs(String).quorum=kraft`
 — which relies on `AssignReplicasToDirs` being sent. So I'd expect that part 
to be working.
   
   It could also be be that the log directory failure somehow isn't being 
propagated (via BrokerHeartbeatRequest), or that the metadata cache in the 
broker isn't being updated. To troubleshoot, I suggest verifying the following 
via logging/breakpoints:
   
   1. Check that the assignment is being sent in 
`AssignmentsManager.onAssignment`
   2. Check that the controller is receiving the assignment in 
`ReplicationControlManager.handleAssignReplicasToDirs`
 

Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1424300228


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -860,6 +871,78 @@ public void testGroupMetadataUpdateSingleCall() {
 }
 }
 
+/**
+ * Tests that the consumer correctly invokes the callbacks for {@link 
ConsumerRebalanceListener} that was
+ * specified. We don't go through the full effort to emulate heartbeats 
and correct group management here. We're
+ * simply exercising the background {@link EventProcessor} does the 
correct thing when
+ * {@link AsyncKafkaConsumer#poll(Duration)} is called.
+ *
+ * Note that we test {@link ConsumerRebalanceListener} that throws errors 
in its different callbacks. Failed
+ * callback execution does not immediately errors. Instead, those 
errors are forwarded to the
+ * application event thread for the {@link MembershipManagerImpl} to 
handle.
+ */
+@ParameterizedTest
+@MethodSource("listenerCallbacksInvokeSource")
+public void 
testListenerCallbacksInvoke(List 
methodNames,
+Optional 
revokedError,
+Optional 
assignedError,
+Optional 
lostError,
+int expectedRevokedCount,
+int expectedAssignedCount,
+int expectedLostCount) {
+CounterConsumerRebalanceListener consumerRebalanceListener = new 
CounterConsumerRebalanceListener(
+revokedError,
+assignedError,
+lostError
+);
+consumer.subscribe(Collections.singletonList("topic"), 
consumerRebalanceListener);
+SortedSet partitions = Collections.emptySortedSet();
+
+for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
+CompletableBackgroundEvent e = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions);
+backgroundEventQueue.add(e);
+
+// This will trigger the background event queue to process our 
background event message.
+consumer.poll(Duration.ZERO);
+}
+
+assertEquals(expectedRevokedCount, 
consumerRebalanceListener.revokedCount());
+assertEquals(expectedAssignedCount, 
consumerRebalanceListener.assignedCount());
+assertEquals(expectedLostCount, consumerRebalanceListener.lostCount());

Review Comment:
   I see. Ideally, we should mock the background part of it here. If this is 
too hard. We can address it separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-12 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1424285645


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -860,6 +871,78 @@ public void testGroupMetadataUpdateSingleCall() {
 }
 }
 
+/**
+ * Tests that the consumer correctly invokes the callbacks for {@link 
ConsumerRebalanceListener} that was
+ * specified. We don't go through the full effort to emulate heartbeats 
and correct group management here. We're
+ * simply exercising the background {@link EventProcessor} does the 
correct thing when
+ * {@link AsyncKafkaConsumer#poll(Duration)} is called.
+ *
+ * Note that we test {@link ConsumerRebalanceListener} that throws errors 
in its different callbacks. Failed
+ * callback execution does not immediately errors. Instead, those 
errors are forwarded to the
+ * application event thread for the {@link MembershipManagerImpl} to 
handle.
+ */
+@ParameterizedTest
+@MethodSource("listenerCallbacksInvokeSource")
+public void 
testListenerCallbacksInvoke(List 
methodNames,
+Optional 
revokedError,
+Optional 
assignedError,
+Optional 
lostError,
+int expectedRevokedCount,
+int expectedAssignedCount,
+int expectedLostCount) {
+CounterConsumerRebalanceListener consumerRebalanceListener = new 
CounterConsumerRebalanceListener(
+revokedError,
+assignedError,
+lostError
+);
+consumer.subscribe(Collections.singletonList("topic"), 
consumerRebalanceListener);
+SortedSet partitions = Collections.emptySortedSet();
+
+for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
+CompletableBackgroundEvent e = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions);
+backgroundEventQueue.add(e);
+
+// This will trigger the background event queue to process our 
background event message.
+consumer.poll(Duration.ZERO);
+}
+
+assertEquals(expectedRevokedCount, 
consumerRebalanceListener.revokedCount());
+assertEquals(expectedAssignedCount, 
consumerRebalanceListener.assignedCount());
+assertEquals(expectedLostCount, consumerRebalanceListener.lostCount());

Review Comment:
   I attempted this, but due to the way the way the code and tests are 
structured, it's very awkward to do so.
   
   Basically, when we create an `AsyncKafkaConsumer`, we start the background 
network I/O thread. That background thread pulls the events off of the queue, 
so there's a timing issue which could make the test flaky. I'll make another 
attempt at this today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-12 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1424283196


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -790,6 +791,297 @@ public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
 verify(membershipManager, never()).transitionToJoining();
 }
 
+@Test
+public void testListenerCallbacksBasic() {
+// Step 1: set up mocks
+MembershipManagerImpl membershipManager = createMemberInStableState();
+CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+String topicName = "topic1";
+Uuid topicId = Uuid.randomUuid();
+
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+
+// Step 2: put the state machine into the appropriate... state
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+assertTrue(membershipManager.reconciliationInProgress());
+assertEquals(0, listener.revokedCounter.get());
+assertEquals(0, listener.assignedCounter.get());
+assertEquals(0, listener.lostCounter.get());
+
+assertTrue(membershipManager.reconciliationInProgress());
+
+// Step 3: assign partitions
+performCallback(
+membershipManager,
+invoker,
+ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+topicPartitions(topicName, 0, 1)
+);
+
+assertFalse(membershipManager.reconciliationInProgress());
+
+// Step 4: Send ack and make sure we're done and our listener was 
called appropriately
+membershipManager.onHeartbeatRequestSent();
+assertEquals(MemberState.STABLE, membershipManager.state());
+assertEquals(topicIdPartitions(topicId, topicName, 0, 1), 
membershipManager.currentAssignment());
+
+assertEquals(0, listener.revokedCounter.get());
+assertEquals(1, listener.assignedCounter.get());
+assertEquals(0, listener.lostCounter.get());
+
+// Step 5: receive an empty assignment, which means we should call 
revoke
+
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
 0, 1));
+receiveEmptyAssignment(membershipManager);
+assertEquals(MemberState.RECONCILING, membershipManager.state());
+assertTrue(membershipManager.reconciliationInProgress());
+
+// Step 6: revoke partitions
+performCallback(
+membershipManager,
+invoker,
+ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+topicPartitions(topicName, 0, 1)
+);
+assertTrue(membershipManager.reconciliationInProgress());
+
+// Step 7: assign partitions should still be called, even though it's 
empty
+performCallback(
+membershipManager,
+invoker,
+ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+Collections.emptySortedSet()
+);
+assertFalse(membershipManager.reconciliationInProgress());
+
+// Step 8: Send ack and make sure we're done and our listener was 
called appropriately
+membershipManager.onHeartbeatRequestSent();
+assertEquals(MemberState.STABLE, membershipManager.state());
+assertFalse(membershipManager.reconciliationInProgress());
+
+assertEquals(1, listener.revokedCounter.get());
+assertEquals(2, listener.assignedCounter.get());
+assertEquals(0, listener.lostCounter.get());
+}
+
+// TODO: Reconciliation needs to support when a listener throws an error 
on onPartitionsRevoked(). When that
+//   happens, the assignment step is skipped, which means 
onPartitionsAssigned() is never run.
+//   The jury is out on whether or not this is a bug or intentional.
+//
+//   See 
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more 
details.
+// @Test
+public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+// Step 1: set up mocks
+MembershipManagerImpl membershipManager = 

Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424277821


##
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java:
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@RunWith(Enclosed.class)
+public class ReadonlyPartiallyDeserializedSegmentValueTest {
+
+/**
+ * Non-exceptional scenarios which are expected to occur during regular 
store operation.
+ */
+@RunWith(Parameterized.class)
+public static class ExpectedCasesTest {
+
+private static final List TEST_CASES = new ArrayList<>();
+
+static {
+// test cases are expected to have timestamps in strictly 
decreasing order (except for the degenerate case)
+TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 
10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 
10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 
10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), 
new TestRecord(null, 2), new TestRecord(null, 1)));
+TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 
10, new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, 
new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 
10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+}
+
+private final TestCase testCase;
+
+public ExpectedCasesTest(final TestCase testCase) {
+this.testCase = testCase;
+}
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return TEST_CASES;
+}
+
+@Test
+public void shouldFindInTimeRangesWithDifferentOrders() {
+
+// create a list of timestamps in ascending order to use them in 
combination for starting and ending point of the time range.
+final List timestamps = 
createTimestampsFromTestRecords(testCase);
+
+// verify results
+final List orders = 
Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING);
+for (final ResultOrder order: orders) {
+for (final Long from : timestamps) {
+for 

[PR] MINOR: Few cleanups to JaasContext/Utils classes [kafka]

2023-12-12 Thread via GitHub


q-ryanamiri opened a new pull request, #14994:
URL: https://github.com/apache/kafka/pull/14994

   Reviewers: Rajini Sivaram 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -69,23 +73,49 @@ public boolean hasNext() {
 if (!open) {
 throw new IllegalStateException("The iterator is out of scope.");
 }
-// since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-return hasStillLoad || maybeFillIterator();
+if (this.next != null) {
+return true;
+}
+
+while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+if (!hasSegmentValue) {
+hasSegmentValue = maybeFillCurrentSegmentValue();
+}
+if (hasSegmentValue) {
+this.next  = getNextRecord();
+if (this.next == null) {
+prepareToFetchNextSegment();
+}
+}
+}
+return this.next != null;
 }
 
 @Override
-public Object next() {
-if (hasNext()) {
-// since data is stored in descending order in the segments, 
retrieve previous record, if the order is Ascending.
-return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+public VersionedRecord next() {
+if (this.next == null) {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
 }
-throw new NoSuchElementException();
+assert this.next != null;

Review Comment:
   > We usually don't use `assert`. Can be removed.
   
   Otherwise, there will be a warning, and we have to suppress it. But I will 
remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -69,23 +73,49 @@ public boolean hasNext() {
 if (!open) {
 throw new IllegalStateException("The iterator is out of scope.");
 }
-// since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-return hasStillLoad || maybeFillIterator();
+if (this.next != null) {
+return true;
+}
+
+while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+if (!hasSegmentValue) {
+hasSegmentValue = maybeFillCurrentSegmentValue();
+}
+if (hasSegmentValue) {
+this.next  = getNextRecord();
+if (this.next == null) {
+prepareToFetchNextSegment();
+}
+}
+}
+return this.next != null;
 }
 
 @Override
-public Object next() {
-if (hasNext()) {
-// since data is stored in descending order in the segments, 
retrieve previous record, if the order is Ascending.
-return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+public VersionedRecord next() {
+if (this.next == null) {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
 }
-throw new NoSuchElementException();
+assert this.next != null;

Review Comment:
   > We usually don't use `assert`. Can be removed.
   
   Otherwise, there will be a warning, and we have to suppress it. But I will 
remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Allow Block.resetBlockLatch to release blocked operation for end-of-test cleanup [kafka]

2023-12-12 Thread via GitHub


gharris1727 commented on PR #14987:
URL: https://github.com/apache/kafka/pull/14987#issuecomment-1852376741

   Closing in favor of #12290 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Allow Block.resetBlockLatch to release blocked operation for end-of-test cleanup [kafka]

2023-12-12 Thread via GitHub


gharris1727 closed pull request #14987: MINOR: Allow Block.resetBlockLatch to 
release blocked operation for end-of-test cleanup
URL: https://github.com/apache/kafka/pull/14987


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424245975


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);
+int currValueSize;
+
+
+while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+if (hasBeenDeserialized(isAscending, currIndex)) {
+final TimestampAndValueSize curr;
+curr = unpackedTimestampAndValueSizes.get(currIndex);
+currTimestamp = curr.timestamp;
+cumValueSize = cumulativeValueSizes.get(currIndex);
+currValueSize = curr.valueSize;
+
+// update currValueSize
+if (currValueSize == Integer.MIN_VALUE) {
+final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));

Review Comment:
   > `cumValueSize` is not updated with `currValueSize`. Why?
   
   Yes, in fact in this if (if deserIndex is ahead of currIndex), we must have 
EVERYTHING ready. Therefore, `if (currValueSize == Integer.MIN_VALUE)` does not 
fit here. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]

2023-12-12 Thread via GitHub


satishd merged PR #14649:
URL: https://github.com/apache/kafka/pull/14649


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]

2023-12-12 Thread via GitHub


aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424136033


##
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+private byte[] segmentValue;
+private long nextTimestamp;
+private long minTimestamp;
+
+private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+private Map cumulativeValueSizes;
+
+private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+private Map unpackedTimestampAndValueSizes 
= new HashMap<>();
+private int recordNumber = -1; // number of segment records
+
+
+ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+this.segmentValue = segmentValue;
+this.nextTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+this.minTimestamp =
+
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+resetDeserHelpers();
+}
+
+
+public long minTimestamp() {
+return minTimestamp;
+}
+
+public long nextTimestamp() {
+return nextTimestamp;
+}
+
+public byte[] serialize() {
+return segmentValue;
+}
+
+
+public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+// this segment does not have any record in query specified time range
+if (toTime < minTimestamp || fromTime > nextTimestamp) {
+return null;
+}
+
+final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+if (isAscending && valuesStartingIndex == -1) {
+findValuesStartingIndex();
+deserIndex = recordNumber;
+}
+
+long currTimestamp = -1;
+long currNextTimestamp = -1;
+int currIndex = initializeCurrentIndex(index, isAscending);
+int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);

Review Comment:
   > Not sure why we need to init `cumValueSize`? It seem below we never read 
the value set here?
   
   We must do that. Look at line#106 (`cumValueSize += Math.max(currValueSize, 
0);`). 
   In fact, when we have everything ready in the cache 
(`unpackedTimestampAndValueSizes`), we do not need it. Otherwise, we need the 
former computed cumValueSize to be able to move to the correct position in the 
segment and deserialize the next one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]

2023-12-12 Thread via GitHub


yashmayya commented on PR #14966:
URL: https://github.com/apache/kafka/pull/14966#issuecomment-1852343242

   Thanks Chris, this looks like a really nice improvement! I can review 
sometime later this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >