[GitHub] [kafka] sudeshwasnik commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-06-14 Thread via GitHub


sudeshwasnik commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1592302321

   hi @sagarrao12 , small correction in my comment earlier  - 
   ```
   Since assumption 1 is incorrect, we should change the test where it doesn't 
expect every record that decrements recordsToCommitLatch also must have been 
produced.
   ```
   it seems that `recordsRemainingLatch` only tries to wait until X records 
have been returned by source-task to framework. Say MINIMUM_MESSAGES = 1000, 
but MESSAGES_PER_POLL is configured 100, so we need to wait until source-task 
delivers 1000 messages to framework. This helps in not including `production` 
time in `awaitCommits` assertion timeout (?) wdyt ? 
   
   Also, the reason this PR passes the test now is because,  it doesn't 
validate X records are present in topic `when` X countDown for `awaitCommits` 
is done. It'll wait `until` X records are produced into the topic -> by then 
there could've been many more `commitRecord`s.  
   This assertion doesn't help now because connector is running continously, so 
there `WILL` be MINIMUM_MESSAGES produced eventually. 
   ``
   assertTrue("Not enough records produced by source connector. 
Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
   sourceRecords.count() >= MINIMUM_MESSAGES);
   ``
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13773: KAFKA-14712: Produce correct error msg with correct metadataversion

2023-06-14 Thread via GitHub


showuon commented on PR #13773:
URL: https://github.com/apache/kafka/pull/13773#issuecomment-1592269594

   @Owen-CH-Leung , thanks for the patch, could you add tests for this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-06-14 Thread via GitHub


showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230345113


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -127,6 +127,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 // hold onto request for committed offset requests to enable async 
calls.
 private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+// holds the offset metadata for assigned partitions to reduce remote 
calls thus speeding up fetching partition metadata
+private final Map 
committedTopicPartitionOffsetsCache;

Review Comment:
   nit: the comment above should mention this is the `committed offset metadata`



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -922,13 +922,9 @@ public void testCommitsFetchedDuringAssign() {
 
 // fetch offset for two topics
 Map offsets = new HashMap<>();
-offsets.put(tp0, offset1);
-client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), 
coordinator);
-assertEquals(offset1, 
consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
-
-offsets.remove(tp0);
 offsets.put(tp1, offset2);

Review Comment:
   Could we add a comment above about why we only need to respond with `tp1, 
offset2`? Something about it's been cached in previous committed offset fetch.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
 AtomicBoolean success = new AtomicBoolean(false);
 
-Map offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+Map offsets = singletonMap(t1p, 
offsetAndMetadata);
 coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
 coordinator.invokeCompletedOffsetCommitCallbacks();
 assertTrue(success.get());
 assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+Map cache = 
coordinator.committedOffsetsCache();
+assertEquals(cache.size(), 1);
+assertEquals(cache.get(t1p), offsetAndMetadata);
+}
+
+@Test
+public void testCommitOffsetMetadataSync() {

Review Comment:
   Thanks for adding the sync test



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
 AtomicBoolean success = new AtomicBoolean(false);
 
-Map offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+Map offsets = singletonMap(t1p, 
offsetAndMetadata);
 coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
 coordinator.invokeCompletedOffsetCommitCallbacks();
 assertTrue(success.get());
 assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+Map cache = 
coordinator.committedOffsetsCache();
+assertEquals(cache.size(), 1);
+assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   assertEquals method signature is `assertEquals(int expected, int actual)`. 
Putting the parameter in the correct order will output the reasonable error 
message if any.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
 AtomicBoolean success = new AtomicBoolean(false);
 
-Map offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+Map offsets = singletonMap(t1p, 
offsetAndMetadata);
 coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
 coordinator.invokeCompletedOffsetCommitCallbacks();
 assertTrue(success.get());
 assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+Map cache = 
coordinator.committedOffsetsCache();
+assertEquals(cache.size(), 1);
+assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   Also, could we assert cache is empty before we `commitOffsetsAsync`? i.e. 
   ```
   assertTrue(cache.isEmpty());
   coordinator.commitOffsetsAsync(...)
   ...
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final 
SortedSet revoke
 final long startMs = time.milliseconds();
 listener.onPartitionsRevoked(revokedPartitions);
 

[jira] [Commented] (KAFKA-15085) Make Timer.java implement AutoCloseable

2023-06-14 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732824#comment-17732824
 ] 

Owen C.H. Leung commented on KAFKA-15085:
-

Hi [~divijvaidya] , can you assign this issue to me ? I'm eager to pick this up.

 

Also, can I ask if you can help to review my PR which aims to solve KAFKA-14712 
? 

https://github.com/apache/kafka/pull/13773

> Make Timer.java implement AutoCloseable
> ---
>
> Key: KAFKA-15085
> URL: https://issues.apache.org/jira/browse/KAFKA-15085
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: Newbie, newbie
>
> Many automatic bug finders will flag a warning if an object of class which is 
> marked as AutoCloseable but is not closed properly in the code. Hence, as a 
> best practice we should implement AutoCloseable for classes which require 
> resources to be released after shutdown.
> Timer.java should implement AutoCloseable and ShutDown should be replaced 
> with close() method.
> Note that this interface change does not require a KIP since Timer.java is 
> not a user facing public class.
> This was discussed here: 
> [https://github.com/apache/kafka/pull/13820#discussion_r1222654614] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14709) Move content in connect/mirror/README.md to the docs

2023-06-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14709.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Move content in connect/mirror/README.md to the docs
> 
>
> Key: KAFKA-14709
> URL: https://issues.apache.org/jira/browse/KAFKA-14709
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, mirrormaker
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> We should move all the content in 
> https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md to the 
> relevant doc sections. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs

2023-06-14 Thread via GitHub


showuon merged PR #13650:
URL: https://github.com/apache/kafka/pull/13650


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs

2023-06-14 Thread via GitHub


showuon commented on PR #13650:
URL: https://github.com/apache/kafka/pull/13650#issuecomment-1592238006

   Failed tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs()
   Build / JDK 11 and Scala 2.13 / 
kafka.admin.DescribeConsumerGroupTest.testDescribeOffsetsOfExistingGroupWithNoMembers()
   Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
   Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testClose()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-14 Thread via GitHub


showuon commented on code in PR #13782:
URL: https://github.com/apache/kafka/pull/13782#discussion_r1230335416


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1007,6 +1007,30 @@ public static void flushDir(Path path) throws 
IOException {
 }
 }
 
+/**
+ * Flushes dirty file to guarantee crash consistency.
+ *
+ * @throws IOException if flushing the file fails.
+ */
+public static void flushFile(Path path) throws IOException {
+if (path != null) {
+try (FileChannel fileChannel = FileChannel.open(path, 
StandardOpenOption.READ)) {
+fileChannel.force(true);
+}
+}
+}
+
+/**
+ * Flushes dirty file quietly, logs warning when exception happens.
+ */
+public static void flushFileQuietly(Path path, String name) {
+try {
+flushFile(path);
+} catch (IOException e) {
+log.warn("Failed to flush {} at path {}", name, path);

Review Comment:
   You should still put `e` in the 3rd parameter, like this:
   `log.warn("Failed to flush {} at path {}", name, path, e);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230295895


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentMatchers;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GroupCoordinatorServiceTest {
+
+@SuppressWarnings("unchecked")
+private CoordinatorRuntime 
mockRuntime() {
+return (CoordinatorRuntime) 
mock(CoordinatorRuntime.class);
+}
+
+private GroupCoordinatorConfig createConfig() {
+return new GroupCoordinatorConfig(
+1,
+45,
+5,
+Integer.MAX_VALUE,
+Collections.singletonList(new RangeAssignor()),
+1000
+);
+}
+
+@Test
+public void testStartupShutdown() throws Exception {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+
+service.startup(() -> 1);
+service.shutdown();
+
+verify(runtime, times(1)).close();
+}
+
+@Test
+public void testConsumerGroupHeartbeatWhenNotStarted() {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+
+ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+.setGroupId("foo");
+
+assertFutureThrows(
+service.consumerGroupHeartbeat(
+requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+request
+),
+CoordinatorNotAvailableException.class
+);
+}
+
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230294539


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReplicatedGroupCoordinatorTest {
+
+@Test
+public void testConsumerGroupHeartbeat() {
+GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+groupMetadataManager
+);
+
+RequestContext context = 
requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
+ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData();
+CoordinatorResult result = 
new CoordinatorResult<>(
+Collections.emptyList(),
+new ConsumerGroupHeartbeatResponseData()
+);
+
+when(coordinator.consumerGroupHeartbeat(
+context,
+request
+)).thenReturn(result);
+
+assertEquals(result, coordinator.consumerGroupHeartbeat(context, 
request));

Review Comment:
   Is this just a test that we don't throw errors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230293541


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentMatchers;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GroupCoordinatorServiceTest {
+
+@SuppressWarnings("unchecked")
+private CoordinatorRuntime 
mockRuntime() {
+return (CoordinatorRuntime) 
mock(CoordinatorRuntime.class);
+}
+
+private GroupCoordinatorConfig createConfig() {
+return new GroupCoordinatorConfig(
+1,
+45,
+5,
+Integer.MAX_VALUE,
+Collections.singletonList(new RangeAssignor()),
+1000
+);
+}
+
+@Test
+public void testStartupShutdown() throws Exception {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+
+service.startup(() -> 1);
+service.shutdown();
+
+verify(runtime, times(1)).close();
+}
+
+@Test
+public void testConsumerGroupHeartbeatWhenNotStarted() {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+
+ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+.setGroupId("foo");
+
+assertFutureThrows(
+service.consumerGroupHeartbeat(
+requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+request
+),
+CoordinatorNotAvailableException.class
+);
+}
+
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230281526


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] wcarlson5 commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1592137217

   @cadonna could you give this a look? @vcrfxia has looked at 
[this](https://github.com/wcarlson5/kafka/pull/1) and there is a check of the 
table's history retention that I am missing but other than that it should be 
good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230246630


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230243216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

2023-06-14 Thread via GitHub


cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1230241309


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel,
 if (apiVersionRequest.hasUnsupportedRequestVersion) {
   requestHelper.sendResponseMaybeThrottle(request,
 requestThrottleMs => 
apiVersionRequest.getErrorResponse(requestThrottleMs, 
UNSUPPORTED_VERSION.exception))
-  CompletableFuture.completedFuture[Unit](())
 } else if (!apiVersionRequest.isValid) {
   requestHelper.sendResponseMaybeThrottle(request,
 requestThrottleMs => 
apiVersionRequest.getErrorResponse(requestThrottleMs, 
INVALID_REQUEST.exception))
-  CompletableFuture.completedFuture[Unit](())
 } else {
-  val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal, OptionalLong.empty())
-  controller.finalizedFeatures(context).handle { (result, exception) =>
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-  if (exception != null) {
-apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-  } else {
-apiVersionManager.apiVersionResponse(requestThrottleMs, 
result.featureMap().asScala.toMap, result.epoch())
-  }
-})
-  }
+  requestHelper.sendResponseMaybeThrottle(request,
+requestThrottleMs => 
apiVersionManager.apiVersionResponse(requestThrottleMs))

Review Comment:
   I will change this not to start the listener until we've caught up to the 
local HWM. That will avoid this and some other gotchas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working

2023-06-14 Thread Ravindranath Kakarla (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732778#comment-17732778
 ] 

Ravindranath Kakarla commented on KAFKA-13988:
--

Does the issue have anything to do with source cluster being old version 
(0.10)? Did someone face this issue with latest versions of Kafka?

> Mirrormaker 2 auto.offset.reset=latest not working
> --
>
> Key: KAFKA-13988
> URL: https://issues.apache.org/jira/browse/KAFKA-13988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.2.0
> Environment: Source Kafka cluster running on Ubuntu 20
> Source Kafka cluster Kafka v0.10
> Target Kafka cluster running in AWS MSK
> Target Kafka cluster Kafka v2.6.2
> Mirrormaker version 3.2.0 running on Ubuntu 20.
>Reporter: Daniel Florek
>Assignee: Justinwins
>Priority: Major
> Fix For: 3.2.0
>
>
> Hi. 
> I have problem setting up mirroring with MM2 from latest offset between 2 
> clusters. In logs I can see that Consumer that is consuming topics has 
> auto.offset.reset property set to latest. But still topics are read from 
> offset 0. I am using following configuration:
>  
> {code:java}
> clusters = A, B
> A.bootstrap.servers = broker-01A:9092
> B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092
> replication.policy.class = 
> org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> #Enable replication between clusters and define topics which should be 
> replicated
> A->B.enabled = true
> A->B.topics = .*
> A->B.replication.factor=3
> A->B.emit.heartbeats.enabled = true
> A->B.emit.checkpoints.enabled = true
> auto.offset.reset=latest
> consumer.auto.offset.reset=latest
> A.consumer.auto.offset.reset=latest
> B.consumer.auto.offset.reset=latest
> refresh.topics.enabled=true
> heartbeats.topic.replication.factor=1
> checkpoints.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> config.storage.replication.factor = 1
> offset.storage.replication.factor = 1
> status.storage.replication.factor = 1 {code}
> I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker 
> running on EC2 instance in AWS (quite an old version I think 0.10). Target 
> kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). 
> Could you point me what I am doing wrong? Or is this possibly a bug?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230229603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = 

[GitHub] [kafka] dimitarndimitrov opened a new pull request, #13856: [DRAFT] KAFKA-15087 Move/rewrite InterBrokerSendThread to server-commons

2023-06-14 Thread via GitHub


dimitarndimitrov opened a new pull request, #13856:
URL: https://github.com/apache/kafka/pull/13856

   The Java rewrite is kept relatively close to the Scala original to minimize 
potential newly introduced bugs and to make reviewing simpler. The following 
details might be of note:
   - The `Logging` trait moved to `InterBrokerSendThread` with the rewrite of 
`ShutdownableThread` has been similarly moved to any subclasses that currently 
use it. `InterBrokerSendThread`'s own logging has been made to use 
`ShutdownableThread`'s logger which mimics the prefix/log identifier that the 
trait provided.
   - The case `RequestAndCompletionHandler` class has been made a separate POJO 
class and the internal-use `UnsentRequests` class has been kept as a static 
nested class.
   - The relatively commonly used but internal (not part of the public API) 
clients classes that `InterBrokerSendThread` relies on have been allowlisted in 
the server-common import control.
   - The accompanying test class has also been moved and rewritten with one new 
test added and most of the pre-existing tests made stricter.
   - The main abstract method of `InterBrokerSendThread` - `generateRequests()` 
- now returns a Java collection so in the initial draft `asScala` has been 
generously sprinkled around its usages. I'll further look into whether there's 
something more preferable for that.
   
   The local test run after the changes ended with _7661 tests completed, 2 
failed, 3 skipped_ and the failures being seemingly unrelated and not in 
server-common or core.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230195090


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The group coordinator configurations.
+ */
+public class GroupCoordinatorConfig {
+public static class Builder {
+private int numThreads = 1;

Review Comment:
   I was also wondering about how config defs worked with documentation. Thanks 
for filling this JIRA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


jolshan commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1230193218


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.runtime.Coordinator;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+/**
+ * The group coordinator replicated state machine that manages the metadata of 
all generic and
+ * consumer groups. It holds the hard and the soft state of the groups. This 
class has two kinds
+ * of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class ReplicatedGroupCoordinator implements Coordinator {

Review Comment:
   I guess it depends on what the other implementations of coordinator will be 
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 opened a new pull request, #13855: Grace period added

2023-06-14 Thread via GitHub


wcarlson5 opened a new pull request, #13855:
URL: https://github.com/apache/kafka/pull/13855

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15092) KafkaClusterTestKit in test jar depends on MockFaultHandler

2023-06-14 Thread Gary Russell (Jira)
Gary Russell created KAFKA-15092:


 Summary: KafkaClusterTestKit in test jar depends on 
MockFaultHandler
 Key: KAFKA-15092
 URL: https://issues.apache.org/jira/browse/KAFKA-15092
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.5.0
Reporter: Gary Russell


{noformat}
java.lang.NoClassDefFoundError: org/apache/kafka/server/fault/MockFaultHandler
at 
kafka.testkit.KafkaClusterTestKit$SimpleFaultHandlerFactory.(KafkaClusterTestKit.java:119)
at 
kafka.testkit.KafkaClusterTestKit$Builder.(KafkaClusterTestKit.java:143)
{noformat}

MockFaultHandler is missing from the test jar.

This PR https://github.com/apache/kafka/pull/13375/files seems to work around 
it by adding the {code}server-common sourcesets.test.output{code} to the class 
path.

The class needs to be available for third parties to create an embedded KRaft 
broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] wcarlson5 merged pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 merged PR #13756:
URL: https://github.com/apache/kafka/pull/13756


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on PR #13756:
URL: https://github.com/apache/kafka/pull/13756#issuecomment-1591908695

   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13756/19/tests/
 
   
   Test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13724: MINOR: more KRaft Metadata Image tests

2023-06-14 Thread via GitHub


mumrah commented on code in PR #13724:
URL: https://github.com/apache/kafka/pull/13724#discussion_r1230101055


##
metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java:
##
@@ -95,6 +95,60 @@ public static void replayOne(
 replayAll(target, Collections.singletonList(recordAndVersion));
 }
 
+public interface TestThroughAllIntermediateImagesLeadingToFinalImageHelper 
{

Review Comment:
   Could we make this a concrete class? We could take in a supplier for an 
empty image and a function for creating a delta from an image. Also, I think we 
could add type parameters to avoid some of the casting. E.g., 
   
   ```java
   class HelperThing {
 HelperThing(Supplier emptyImageSupplier, Function imageToDelta) 
   
 void test(I image, List fromRecords);
   }
   ```
   
   then use it like
   ```java
   HelperThing helper = new HelperThing(() -> TopicsImage.EMPTY,  image -> new 
TopicsDelta(image));
   helper.test(image, fromRecords);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #6420: KAFKA-3881: use plain topic tag in Fetcher metrics

2023-06-14 Thread via GitHub


kirktrue commented on PR #6420:
URL: https://github.com/apache/kafka/pull/6420#issuecomment-1591846803

   @dbrinegar @tombentley @guozhangwang @junrao Can this PR be resurrected?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-3881) Remove the replacing logic from "." to "_" in Fetcher

2023-06-14 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732694#comment-17732694
 ] 

Kirk True commented on KAFKA-3881:
--

Two questions:
 # Is there any reason the PR wasn't merged?
 # Will making this change now break users who've coded this discrepancy into 
their applications/tools?

> Remove the replacing logic from "." to "_" in Fetcher
> -
>
> Key: KAFKA-3881
> URL: https://issues.apache.org/jira/browse/KAFKA-3881
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, metrics
>Reporter: Guozhang Wang
>Assignee: Tom Bentley
>Priority: Major
>  Labels: newbie, patch-available
>
> The logic of replacing "." to "_" in metrics names / tags was originally 
> introduced in the core package's metrics since Graphite treats "." as 
> hierarchy separators (see KAFKA-1902); for the client metrics, it is supposed 
> that the GraphiteReported should take care of this itself rather than letting 
> Kafka metrics to special handle for it. In addition, right now only consumer 
> Fetcher had replace, but producer Sender does not have it actually.
> So we should consider removing this logic in the consumer Fetcher's metrics 
> package. NOTE that this is a public API backward incompatible change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-3881) Remove the replacing logic from "." to "_" in Fetcher

2023-06-14 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732693#comment-17732693
 ] 

Kirk True commented on KAFKA-3881:
--

We have a user running into this issue. It appears that only the 
fetcher-related metrics are the metrics that preemptively convert {{.}} to 
{{{}_{}}}.

My initial research lead me down the path to the fetcher metric names, which 
looks to jive with the direction of [~tombentley] and [~dbrinegar]'s solutions 
in their respective pull requests.

> Remove the replacing logic from "." to "_" in Fetcher
> -
>
> Key: KAFKA-3881
> URL: https://issues.apache.org/jira/browse/KAFKA-3881
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, metrics
>Reporter: Guozhang Wang
>Assignee: Tom Bentley
>Priority: Major
>  Labels: newbie, patch-available
>
> The logic of replacing "." to "_" in metrics names / tags was originally 
> introduced in the core package's metrics since Graphite treats "." as 
> hierarchy separators (see KAFKA-1902); for the client metrics, it is supposed 
> that the GraphiteReported should take care of this itself rather than letting 
> Kafka metrics to special handle for it. In addition, right now only consumer 
> Fetcher had replace, but producer Sender does not have it actually.
> So we should consider removing this logic in the consumer Fetcher's metrics 
> package. NOTE that this is a public API backward incompatible change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] novosibman commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-14 Thread via GitHub


novosibman commented on code in PR #13782:
URL: https://github.com/apache/kafka/pull/13782#discussion_r1230002264


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -976,6 +976,30 @@ public static void flushDir(Path path) throws IOException {
 }
 }
 
+/**
+ * Flushes dirty file to guarantee crash consistency.
+ *
+ * @throws IOException if flushing the file fails.
+ */
+public static void flushFile(Path path) throws IOException {
+if (path != null) {
+try (FileChannel fileChannel = FileChannel.open(path, 
StandardOpenOption.READ)) {
+fileChannel.force(true);
+}
+}
+}
+
+/**
+ * Flushes dirty file quietly, logs warning when exception happens.
+ */
+public static void flushFileQuietly(Path path, String name) {
+try {
+flushFile(path);
+} catch (IOException e) {
+log.warn("Failed to flush {} at path {} with exception {}", name, 
path, e);

Review Comment:
   Third parameter removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-06-14 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732666#comment-17732666
 ] 

Yash Mayya commented on KAFKA-15091:


{quote}{{it does not have anything to do with the offsets returned from 
{{SourceTask:poll}} and is instead just a general, periodically-invoked hook to 
let the task know that an offset commit has taken place (but with no guarantees 
as to which offsets have been committed and which ones correspond to 
still-in-flight records).}}
{quote}
 

The SourceTask::commit method doesn't seem like a particularly useful hook in 
its current shape; I wonder whether we should consider deprecating it...?

> Javadocs for SourceTask::commit are incorrect
> -
>
> Key: KAFKA-15091
> URL: https://issues.apache.org/jira/browse/KAFKA-15091
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> The Javadocs for {{SourceTask::commit}} state that the method should:
> {quote}Commit the offsets, up to the offsets that have been returned by 
> [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
> {quote}
> However, this is obviously incorrect given how the Connect runtime (when not 
> configured with exactly-once support for source connectors) performs polling 
> and offset commits on separate threads. There's also some extensive 
> discussion on the semantics of that method in KAFKA-5716 where it's made 
> clear that altering the behavior of the runtime to align with the documented 
> semantics of that method is not a viable option.
> We should update the Javadocs for this method to state that it does not have 
> anything to do with the offsets returned from {{SourceTask:poll}} and is 
> instead just a general, periodically-invoked hook to let the task know that 
> an offset commit has taken place (but with no guarantees as to which offsets 
> have been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits

2023-06-14 Thread via GitHub


gharris1727 commented on PR #13838:
URL: https://github.com/apache/kafka/pull/13838#issuecomment-1591734588

   Yep, I was able to reproduce some flaky failures in testReplication, and 
they appear to have a lot of failures in CI. I'll debug those further and 
remediate them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #13839: MINOR:Fill missing parameter annotations for some LogCleaner methods

2023-06-14 Thread via GitHub


jlprat commented on PR #13839:
URL: https://github.com/apache/kafka/pull/13839#issuecomment-1591700252

   Thanks for the PR @hudeqi. I think it's always good to improve JavaDocs or 
ScalaDocs.
   
   I was looking at the `LogCleaner.scala` file and I saw there are plenty of 
methods that are public which have only a very general documentation and they 
don't have any parameter annotation with documentation. For example 
`abortCleaning` in line 220 and some of the following methods.
   
   For the sake of completion, would you be up to adding the missing 
annotations to the methods that are public? Extra mile for all the ones that 
are package-log-protected (`private[log]`) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13724: MINOR: more KRaft Metadata Image tests

2023-06-14 Thread via GitHub


rondagostino commented on PR #13724:
URL: https://github.com/apache/kafka/pull/13724#issuecomment-1591687192

   4 test failures on latest build are unrelated.  This PR just changes tests, 
and all of the affected tests passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229930816


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore 
implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+final V value = 
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+
+if (bufferValue.context().timestamp() < minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}

Review Comment:
   I need to swap those lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229930160


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore 
implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+final V value = 
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+
+if (bufferValue.context().timestamp() < minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+
+callback.accept(new Eviction<>(key, value, 
bufferValue.context()));
+
+wrapped().remove(keyValue.key);
+numRecords--;
+bufferSize = bufferSize - computeRecordSize(keyValue.key, 
bufferValue);
+}
+if (numRecords == 0) {
+

[GitHub] [kafka] gharris1727 commented on pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits

2023-06-14 Thread via GitHub


gharris1727 commented on PR #13838:
URL: https://github.com/apache/kafka/pull/13838#issuecomment-1591647109

   > This change seems to help with MirrorCheckpoint failing to start. I now 
get a lot of failures due to MirrorHeartbeat.
   
   I'll try to reproduce this locally, I hadn't noticed that.
   
   > Looking at the logs, it seems a significant amount of time is spent 
failing to load faulty test plugins, for example:
   
   This log message is from #13182 and the faulty plugins started appearing 
after #13467. I think that the 14 seconds might be a little exaggerated as the 
scanning is re-done for each connect worker, and this test instantiates 6 
workers across 2 clusters. I'll look into the performance impact of the new 
plugin instantiations to see if it's more than I expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits

2023-06-14 Thread via GitHub


gharris1727 commented on code in PR #13838:
URL: https://github.com/apache/kafka/pull/13838#discussion_r1229900453


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -1183,17 +1187,31 @@ private void createTopics() {
 }
 }
 
-/*
- * Generate some consumer activity on both clusters to ensure the 
checkpoint connector always starts promptly
+/**
+ * Commit offset 0 for all partitions of test-topic-1 for the specified 
consumer groups on primary and backup clusters.
+ * This is done to force the MirrorCheckpointConnector to start at a 
task which checkpoints this group.
+ * Must be called before {@link #waitUntilMirrorMakerIsRunning} to prevent 
that method from timing out.
  */
-protected void warmUpConsumer(Map consumerProps) {
-try (Consumer dummyConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-dummyConsumer.commitSync();
-}
-try (Consumer dummyConsumer = 
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-dummyConsumer.commitSync();
+protected void prepareConsumerGroup(Map consumerProps) {
+prepareConsumerGroup(primary.kafka(), consumerProps, "test-topic-1");
+prepareConsumerGroup(backup.kafka(), consumerProps, "test-topic-1");
+}
+
+private void prepareConsumerGroup(EmbeddedKafkaCluster cluster, 
Map consumerProps, String topic) {
+try (Admin client = cluster.createAdminClient()) {
+Map topics = 
client.describeTopics(Collections.singleton(topic))
+.allTopicNames()
+.get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS);
+Map collect = topics.get(topic)
+.partitions()
+.stream()
+.collect(Collectors.toMap(
+tpi -> new TopicPartition(topic, tpi.partition()),
+ignored -> new OffsetAndMetadata(0L)));
+AlterConsumerGroupOffsetsResult alterResult = 
client.alterConsumerGroupOffsets((String) consumerProps.get("group.id"), 
collect);
+alterResult.all().get(REQUEST_TIMEOUT_DURATION_MS, 
TimeUnit.MILLISECONDS);
+} catch (ExecutionException | InterruptedException | TimeoutException 
e) {

Review Comment:
   I was just avoiding changing the throws signatures, but I see now that it's 
just 3 sites. I'll change this to avoid wrapping the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-06-14 Thread via GitHub


vamossagar12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1591623006

   Thanks @sudeshwasnik , for the assessment. Continuing the line of thought, 
if you check the `MonitorableSourceConnector#poll` method, it appears to me 
that we decrement the `recordsRemainingLatch` latch equal to the number of 
records in the batch irrespective of the fact that whether the records are 
going to be part of a transaction that can be aborted here: 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java#L215-L216
   
   Because of this, the `awaitRecords` method is able to have the latch 
decremented enough number of times to be counted down to 0 within the timeout. 
   
   However, as you also rightly pointed out, the `recordToCommitLatch` is 
decremented for either cases of abort or commit of the txn which means 
`awaitCommits` also passes within the timeout.
   
   Could this make the test flaky since the bound on the 
`recordsRemainingLatch` doesn't seem to be strong enough to ensure that the 
actual number of records in the topic equals that? Would it help if we 
decrement the `recordsRemainingLatch` for cases of committed transactions only?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-06-14 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15091:
-

 Summary: Javadocs for SourceTask::commit are incorrect
 Key: KAFKA-15091
 URL: https://issues.apache.org/jira/browse/KAFKA-15091
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton


The Javadocs for {{SourceTask::commit}} state that the method should:
{quote}Commit the offsets, up to the offsets that have been returned by 
[{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
{quote}
However, this is obviously incorrect given how the Connect runtime (when not 
configured with exactly-once support for source connectors) performs polling 
and offset commits on separate threads. There's also some extensive discussion 
on the semantics of that method in KAFKA-5716 where it's made clear that 
altering the behavior of the runtime to align with the documented semantics of 
that method is not a viable option.

We should update the Javadocs for this method to state that it does not have 
anything to do with the offsets returned from {{SourceTask:poll}} and is 
instead just a general, periodically-invoked hook to let the task know that an 
offset commit has taken place (but with no guarantees as to which offsets have 
been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya closed pull request #13778: Feature/shuai add comment

2023-06-14 Thread via GitHub


divijvaidya closed pull request #13778: Feature/shuai add comment
URL: https://github.com/apache/kafka/pull/13778


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13778: Feature/shuai add comment

2023-06-14 Thread via GitHub


divijvaidya commented on PR #13778:
URL: https://github.com/apache/kafka/pull/13778#issuecomment-1591600444

   I am closing this PR. Please refer to the comment above for the reason - 
https://github.com/apache/kafka/pull/13778#issuecomment-1567994161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac merged PR #13820:
URL: https://github.com/apache/kafka/pull/13820


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229867769


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RocksDBTimeOrderedKeyValueBufferTest {
+public RocksDBTimeOrderedKeyValueBuffer buffer;
+@Mock
+public SerdeGetter serdeGetter;
+public InternalProcessorContext context;
+public StreamsMetricsImpl streamsMetrics;
+@Mock
+public Sensor sensor;
+public long offset;
+
+@Before
+public void setUp() {
+when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
+when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+final Metrics metrics = new Metrics();
+offset = 0;
+streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
+}
+
+private void createBuffer(final Duration grace) {
+final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing",  100).get();
+buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, 
"testing");
+buffer.setSerdesIfNull(serdeGetter);
+buffer.init((StateStoreContext) context, store);
+}
+
+private void pipeRecord(final String key, final String value, final long 
time) {
+final Record record = new Record<>(key, value, time);
+context.setRecordContext(new ProcessorRecordContext(time, offset++, 0, 
"testing", new RecordHeaders()));
+buffer.put(time, record, context.recordContext());
+}
+
+@Test
+public void shouldPutInBufferAndUpdateFields() {
+createBuffer(Duration.ofMinutes(1));
+assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0);
+pipeRecord("1", "0", 0L);
+assertNumSizeAndTimestamp(buffer, 1, 0, 42);
+pipeRecord("3", "0", 2L);
+assertNumSizeAndTimestamp(buffer, 2, 0, 84);
+

Review Comment:
   ```suggestion
   ```



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-14 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867810


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
 getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, 
startingOffset).position
   }
 
+  /**
+   * Close should synchronously cleanup the resources used by this cache.
+   * This index is closed when [[RemoteLogManager]] is closed.
+   */
   def close(): Unit = {
-closed = true
-cleanerThread.shutdown()
-// Close all the opened indexes.
-lock synchronized {
-  entries.values().stream().forEach(entry => entry.close())
+// make close idempotent
+if (!closed.getAndSet(true)) {
+  // Initiate shutdown for cleaning thread
+  val shutdownRequired = cleanerThread.initiateShutdown()
+  // Close all the opened indexes to force unload mmap memory. This does 
not delete the index files from disk.
+  internalCache.asMap().forEach((_, entry) => entry.close())
+  // Perform any pending activities required by the cache for cleanup
+  internalCache.cleanUp()

Review Comment:
   Thank you for jumping in this review and providing your expert opinion on 
this pr. Appreciate it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-14 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867810


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
 getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, 
startingOffset).position
   }
 
+  /**
+   * Close should synchronously cleanup the resources used by this cache.
+   * This index is closed when [[RemoteLogManager]] is closed.
+   */
   def close(): Unit = {
-closed = true
-cleanerThread.shutdown()
-// Close all the opened indexes.
-lock synchronized {
-  entries.values().stream().forEach(entry => entry.close())
+// make close idempotent
+if (!closed.getAndSet(true)) {
+  // Initiate shutdown for cleaning thread
+  val shutdownRequired = cleanerThread.initiateShutdown()
+  // Close all the opened indexes to force unload mmap memory. This does 
not delete the index files from disk.
+  internalCache.asMap().forEach((_, entry) => entry.close())
+  // Perform any pending activities required by the cache for cleanup
+  internalCache.cleanUp()

Review Comment:
   Thank you for jumping in this review and providing your expert opinion on 
this cache. Appreciate it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-14 Thread via GitHub


ben-manes commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867001


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
 getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, 
startingOffset).position
   }
 
+  /**
+   * Close should synchronously cleanup the resources used by this cache.
+   * This index is closed when [[RemoteLogManager]] is closed.
+   */
   def close(): Unit = {
-closed = true
-cleanerThread.shutdown()
-// Close all the opened indexes.
-lock synchronized {
-  entries.values().stream().forEach(entry => entry.close())
+// make close idempotent
+if (!closed.getAndSet(true)) {
+  // Initiate shutdown for cleaning thread
+  val shutdownRequired = cleanerThread.initiateShutdown()
+  // Close all the opened indexes to force unload mmap memory. This does 
not delete the index files from disk.
+  internalCache.asMap().forEach((_, entry) => entry.close())
+  // Perform any pending activities required by the cache for cleanup
+  internalCache.cleanUp()

Review Comment:
   Yep, it is just a fancy in-memory hash map. Caffeine doesn’t create threads, 
files, whatever. The cleanUp() method simply runs the maintenance work like 
discarding expired or weak/soft collected entries, which might occur before it 
realizes otherwise. It is fully gc’able.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-14 Thread via GitHub


ben-manes commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867001


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
 getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, 
startingOffset).position
   }
 
+  /**
+   * Close should synchronously cleanup the resources used by this cache.
+   * This index is closed when [[RemoteLogManager]] is closed.
+   */
   def close(): Unit = {
-closed = true
-cleanerThread.shutdown()
-// Close all the opened indexes.
-lock synchronized {
-  entries.values().stream().forEach(entry => entry.close())
+// make close idempotent
+if (!closed.getAndSet(true)) {
+  // Initiate shutdown for cleaning thread
+  val shutdownRequired = cleanerThread.initiateShutdown()
+  // Close all the opened indexes to force unload mmap memory. This does 
not delete the index files from disk.
+  internalCache.asMap().forEach((_, entry) => entry.close())
+  // Perform any pending activities required by the cache for cleanup
+  internalCache.cleanUp()

Review Comment:
   Yep, it is just a fancy in-memory hash map. Caffeine doesn’t create threads, 
files, whatever. The cleanUp() method simply runs the maintenance work like 
discarding expired or weak/soft collected entries, which might occur before it 
realizes otherwise. It if fully gc’able.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-14 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1229862041


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
 getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, 
startingOffset).position
   }
 
+  /**
+   * Close should synchronously cleanup the resources used by this cache.
+   * This index is closed when [[RemoteLogManager]] is closed.
+   */
   def close(): Unit = {
-closed = true
-cleanerThread.shutdown()
-// Close all the opened indexes.
-lock synchronized {
-  entries.values().stream().forEach(entry => entry.close())
+// make close idempotent
+if (!closed.getAndSet(true)) {
+  // Initiate shutdown for cleaning thread
+  val shutdownRequired = cleanerThread.initiateShutdown()
+  // Close all the opened indexes to force unload mmap memory. This does 
not delete the index files from disk.
+  internalCache.asMap().forEach((_, entry) => entry.close())
+  // Perform any pending activities required by the cache for cleanup
+  internalCache.cleanUp()

Review Comment:
   Great call out. I did not realise that. I do not want removal listener to 
execute on shutdown. I will not do anything at all here and just let GC clean 
it up.
   
   Am I correct in assuming that no specific "cleanup" is necessary to release 
the resources acquired by the cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15090) Source tasks are no longer stopped on a separate thread

2023-06-14 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15090:
-

 Summary: Source tasks are no longer stopped on a separate thread
 Key: KAFKA-15090
 URL: https://issues.apache.org/jira/browse/KAFKA-15090
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 
3.3.0, 3.1.1, 3.2.0, 3.0.1, 3.0.0, 3.1.0, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 
3.3.3, 3.6.0, 3.5.1
Reporter: Chris Egerton
Assignee: Chris Egerton


Before [https://github.com/apache/kafka/pull/9669,] in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the 
logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the 
dedicated thread for the task (i.e., the one responsible for polling data from 
it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13705: MINOR:Refactor the metric names into constants in ReplicaManager

2023-06-14 Thread via GitHub


divijvaidya commented on PR #13705:
URL: https://github.com/apache/kafka/pull/13705#issuecomment-1591550112

   I triggered the CI tests again since last run wasn't complete. Will merge 
this in after they are successful. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229836485


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore 
implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+final V value = 
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+
+if (bufferValue.context().timestamp() < minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+
+callback.accept(new Eviction<>(key, value, 
bufferValue.context()));
+
+wrapped().remove(keyValue.key);
+numRecords--;
+bufferSize = bufferSize - computeRecordSize(keyValue.key, 
bufferValue);
+}
+if (numRecords == 0) {
+

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229836485


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore 
implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+final V value = 
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+
+if (bufferValue.context().timestamp() < minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+
+callback.accept(new Eviction<>(key, value, 
bufferValue.context()));
+
+wrapped().remove(keyValue.key);
+numRecords--;
+bufferSize = bufferSize - computeRecordSize(keyValue.key, 
bufferValue);
+}
+if (numRecords == 0) {
+

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229824269


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore 
implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {

Review Comment:
   I think so but I can't prove it. If not we might have to deal with slight 
missordering until we can find a solution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229732758


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDbTimeOrderedKeyValueBytesStoreSupplier {

Review Comment:
   oops I changed RocksDBKeyValueBytesStoreSupplier Instead, got this one as 
well



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());

Review Comment:
   Well my thought was that we already have the store to be initialized from 
the constructor so we shouldn't risk a different store being passed in. We 
could use the root I suppose but I see no reason too.



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF 

[GitHub] [kafka] dajac commented on pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on PR #13820:
URL: https://github.com/apache/kafka/pull/13820#issuecomment-1591401484

   Failed tests seem unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / testReplication() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   1m 45s
   Build / JDK 11 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   1m 2s
   Build / JDK 11 and Scala 2.13 / testRestartReplication() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   3m 8s
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
   1m 29s
   Build / JDK 11 and Scala 2.13 / 
testDescribeAtMinIsrPartitions(String).quorum=zk – 
kafka.admin.TopicCommandIntegrationTest
   6s
   Build / JDK 11 and Scala 2.13 / [1] Type=ZK, 
Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest
   14s
   Build / JDK 8 and Scala 2.12 / 
testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
   1m 20s
   Build / JDK 8 and Scala 2.12 / [1] Type=ZK, 
Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest
   15s
   Build / JDK 17 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   13s
   Build / JDK 17 and Scala 2.13 / 
shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2] – 
org.apache.kafka.streams.integration.EosIntegrationTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13854: MINOR: rat should depend on processMessages task

2023-06-14 Thread via GitHub


dajac opened a new pull request, #13854:
URL: https://github.com/apache/kafka/pull/13854

   This fix the following issue that we occasionally see in 
[builds](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13848/4/pipeline/13/).
   
   ```
   [2023-06-14T11:41:50.769Z] * What went wrong:
   [2023-06-14T11:41:50.769Z] A problem was found with the configuration of 
task ':rat' (type 'RatTask').
   [2023-06-14T11:41:50.769Z]   - Gradle detected a problem with the following 
location: '/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13848'.
   [2023-06-14T11:41:50.769Z] 
   [2023-06-14T11:41:50.769Z] Reason: Task ':rat' uses this output of task 
':clients:processTestMessages' without declaring an explicit or implicit 
dependency. This can lead to incorrect results being produced, depending on 
what order the tasks are executed.
   [2023-06-14T11:41:50.769Z] 
   [2023-06-14T11:41:50.769Z] Possible solutions:
   [2023-06-14T11:41:50.769Z]   1. Declare task 
':clients:processTestMessages' as an input of ':rat'.
   [2023-06-14T11:41:50.769Z]   2. Declare an explicit dependency on 
':clients:processTestMessages' from ':rat' using Task#dependsOn.
   [2023-06-14T11:41:50.769Z]   3. Declare an explicit dependency on 
':clients:processTestMessages' from ':rat' using Task#mustRunAfter.
   [2023-06-14T11:41:50.769Z] 
   [2023-06-14T11:41:50.769Z] Please refer to 
https://docs.gradle.org/8.1.1/userguide/validation_problems.html#implicit_dependency
 for more details about this problem.
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task

2023-06-14 Thread via GitHub


dajac commented on PR #13854:
URL: https://github.com/apache/kafka/pull/13854#issuecomment-1591397811

   Similar to https://github.com/apache/kafka/pull/13316.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13705: MINOR:Refactor the metric names into constants in ReplicaManager

2023-06-14 Thread via GitHub


hudeqi commented on PR #13705:
URL: https://github.com/apache/kafka/pull/13705#issuecomment-1591356625

   > Looks good to me. One minor thing is to update the PR name (since we use 
the same for commit messages). As I understand, we are not really optimizing 
anything here. We are refactoring the metric names into constants. 
   > 
   > Can you please change the PR description to reflect the same? 
   
   ok, I see, updated the pr name. @divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits

2023-06-14 Thread via GitHub


mimaison commented on code in PR #13838:
URL: https://github.com/apache/kafka/pull/13838#discussion_r1229669181


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -1183,17 +1187,31 @@ private void createTopics() {
 }
 }
 
-/*
- * Generate some consumer activity on both clusters to ensure the 
checkpoint connector always starts promptly
+/**
+ * Commit offset 0 for all partitions of test-topic-1 for the specified 
consumer groups on primary and backup clusters.
+ * This is done to force the MirrorCheckpointConnector to start at a 
task which checkpoints this group.
+ * Must be called before {@link #waitUntilMirrorMakerIsRunning} to prevent 
that method from timing out.
  */
-protected void warmUpConsumer(Map consumerProps) {
-try (Consumer dummyConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-dummyConsumer.commitSync();
-}
-try (Consumer dummyConsumer = 
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-dummyConsumer.commitSync();
+protected void prepareConsumerGroup(Map consumerProps) {
+prepareConsumerGroup(primary.kafka(), consumerProps, "test-topic-1");
+prepareConsumerGroup(backup.kafka(), consumerProps, "test-topic-1");
+}
+
+private void prepareConsumerGroup(EmbeddedKafkaCluster cluster, 
Map consumerProps, String topic) {
+try (Admin client = cluster.createAdminClient()) {
+Map topics = 
client.describeTopics(Collections.singleton(topic))
+.allTopicNames()
+.get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS);
+Map collect = topics.get(topic)
+.partitions()
+.stream()
+.collect(Collectors.toMap(
+tpi -> new TopicPartition(topic, tpi.partition()),
+ignored -> new OffsetAndMetadata(0L)));
+AlterConsumerGroupOffsetsResult alterResult = 
client.alterConsumerGroupOffsets((String) consumerProps.get("group.id"), 
collect);
+alterResult.all().get(REQUEST_TIMEOUT_DURATION_MS, 
TimeUnit.MILLISECONDS);
+} catch (ExecutionException | InterruptedException | TimeoutException 
e) {

Review Comment:
   Is there a reason we're catching and rethrowing as RuntimeException here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks

2023-06-14 Thread via GitHub


C0urante commented on PR #13819:
URL: https://github.com/apache/kafka/pull/13819#issuecomment-1591326215

   @mimaison @viktorsomogyi sorry for the extra ping--this issue is actually a 
regression and is leading to increased failure rates on Jenkins; would it be 
possible to take a look sometime this week or the next?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-14 Thread via GitHub


divijvaidya commented on PR #13782:
URL: https://github.com/apache/kafka/pull/13782#issuecomment-1591281777

   Hey @novosibman could you please respond to rest of the comments at 
https://github.com/apache/kafka/pull/13782#pullrequestreview-1461194326 and 
https://github.com/apache/kafka/pull/13782#discussion_r1216711941


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13700: KAFKA-14959: remove delayed queue and exempt sensors during ClientQuota and ClientRequestQuota managers shutdown

2023-06-14 Thread via GitHub


machi1990 commented on PR #13700:
URL: https://github.com/apache/kafka/pull/13700#issuecomment-1591262415

   @divijvaidya since you worked on https://github.com/apache/kafka/pull/13623, 
would you be open to give this PR a review? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13624: MINOR: remove unused ProcessorNode#time field and remove unused imports

2023-06-14 Thread via GitHub


machi1990 commented on PR #13624:
URL: https://github.com/apache/kafka/pull/13624#issuecomment-1591235123

   Thank you @divijvaidya for the review and merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13831: KAFKA-15053: Use case insensitive validator for security.protocol config

2023-06-14 Thread via GitHub


divijvaidya commented on PR #13831:
URL: https://github.com/apache/kafka/pull/13831#issuecomment-1591232505

   > Also, what's the common process to update the documentation?
   
   As part of this PR, please modify the documentation at:
   
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/docs/security.html#L69


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-14 Thread via GitHub


hudeqi commented on code in PR #13719:
URL: https://github.com/apache/kafka/pull/13719#discussion_r1229633097


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -680,12 +680,16 @@ abstract class AbstractFetcherThread(name: String,
*/
   val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
   val leaderStartOffset = offsetAndEpoch.offset
-  warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
-s"leader's start offset $leaderStartOffset")
   val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
   // Only truncate log when current leader's log start offset is greater 
than follower's log end offset.
-  if (leaderStartOffset > replicaEndOffset)
+  if (leaderStartOffset > replicaEndOffset) {
+warn(s"Truncate fully and start at leader's start offset 
$leaderStartOffset for partition $topicPartition " +
+  s"because the local replica logEndOffset $replicaEndOffset is 
smaller than leader's start offset.")

Review Comment:
   > What do you think about the following wording to be more aligned with the 
other log message?
   > 
   > ```
   > Truncate fully and reset fetch offset for partition $topicPartition from 
$current to the current leader's start offset $new because the local replica's 
end offset is smaller than the current leader's start offsets.
   > ```
   > 
   
   In contrast, the overall meaning is similar, I don't feel a huge difference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7802) Connection to Broker Disconnected Taking Down the Whole Cluster

2023-06-14 Thread Lubos Hozzan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732540#comment-17732540
 ] 

Lubos Hozzan commented on KAFKA-7802:
-

Hello.

The problem is still persistent. We using version *3.4.1* in KRaft mode (3 
instances in Kubernetes cluster). Warning looks like (in fact, it is avalanche 
of same or very similar records from each Kafka instance):

{noformat}
[2023-06-13 12:21:25,771] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={tsttopic-1=PartitionData(topicId=4UDBTCegTPy5mdCbL5fLyg, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[6], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread)
{noformat}

Please, focus to the {{metadata}}: *{{sessionId=INVALID, epoch=INITIAL}}*

This problem begin in empty cluster (= PVCs for instances pods are empty) at 
first start. Affected are all instances.

I attempted:
- restart instances one-by-one = no change
- stop all instances at once and start them again at once = no change
- stop all instances and delete PVCs (Kafka cluster start as a empty) = problem 
sometimes disappeared

Is the problem in stored data? When problem disappeared, restarting instances 
not have effect, cluster is running fine. I mean, if instances made their 
folders structure correctly, they are working without any problems.

Next strange thing based on my observation are metrics, particular 
{{BrokerTopicMetrics BytesInPerSec}}:

!BytesInput.png!

As you can see, when problem begin, it looks like, that cluster have two 
leaders (correct leader and some fake) and both generating the metrics (before 
14:20). When problem disappeared (after 14:20), the metrics are emitted only 
from one instance, which was in the time elected leader.

Hope that helps get closer to solve the problem.

Best regards.

> Connection to Broker Disconnected Taking Down the Whole Cluster
> ---
>
> Key: KAFKA-7802
> URL: https://issues.apache.org/jira/browse/KAFKA-7802
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Candice Wan
>Priority: Critical
> Attachments: BytesInput.png, thread_dump.log
>
>
> We recently upgraded to 2.1.0. Since then, several times per day, we observe 
> some brokers were disconnected when other brokers were trying to fetch the 
> replicas. This issue took down the whole cluster, making all the producers 
> and consumers not able to publish or consume messages. It could be quickly 
> fixed by restarting the problematic broker.
> Here is an example of what we're seeing in the broker which was trying to 
> send fetch request to the problematic one:
> 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error sending fetch request (sessionId=937967566, epoch=1599941) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-1-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=1] Error sending fetch request (sessionId=506217047, epoch=1375749) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] WARN 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-11=(offset=421032847, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[178])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=937967566, 
> epoch=1599941))
>  java.io.IOException: Connection to 3 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:199)
>  at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
>  at 

[GitHub] [kafka] divijvaidya merged pull request #13624: MINOR: remove unused ProcessorNode#time field and remove unused imports

2023-06-14 Thread via GitHub


divijvaidya merged PR #13624:
URL: https://github.com/apache/kafka/pull/13624


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12458) Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob Storage)

2023-06-14 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732539#comment-17732539
 ] 

Ivan Yurchenko commented on KAFKA-12458:


FYI: I've been working on a {{RemoteStorageManager}} implementation that 
currently supports only AWS S3, but it's planned to add Azure in near future. 
[https://github.com/aiven/tiered-storage-for-apache-kafka]

> Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob 
> Storage)
> -
>
> Key: KAFKA-12458
> URL: https://issues.apache.org/jira/browse/KAFKA-12458
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>
> Task to cover integration support for Azure Storage
>  * Azure Blob Storage
>  * Azure Data Lake Store
> Will split task up later into distinct tracks and components



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9565) Implementation of Tiered Storage SPI to integrate with S3

2023-06-14 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732536#comment-17732536
 ] 

Ivan Yurchenko commented on KAFKA-9565:
---

AFAIU, concrete {{RemoteStorageManager}} implementations won't be hosted in the 
Apache Kafka repo. So this ticket should probably be closed as wont-fix.

I've been working on a {{RemoteStorageManager}} implementation that supports 
AWS S3 and in future other cloud object storages (at least Azure and GCS): 
[https://github.com/aiven/tiered-storage-for-apache-kafka|https://github.com/aiven/tiered-storage-for-apache-kafka]

> Implementation of Tiered Storage SPI to integrate with S3
> -
>
> Key: KAFKA-9565
> URL: https://issues.apache.org/jira/browse/KAFKA-9565
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alexandre Dupriez
>Assignee: Ivan Yurchenko
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-14 Thread via GitHub


dajac commented on code in PR #13719:
URL: https://github.com/apache/kafka/pull/13719#discussion_r1229581401


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -680,12 +680,16 @@ abstract class AbstractFetcherThread(name: String,
*/
   val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
   val leaderStartOffset = offsetAndEpoch.offset
-  warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
-s"leader's start offset $leaderStartOffset")
   val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
   // Only truncate log when current leader's log start offset is greater 
than follower's log end offset.
-  if (leaderStartOffset > replicaEndOffset)
+  if (leaderStartOffset > replicaEndOffset) {
+warn(s"Truncate fully and start at leader's start offset 
$leaderStartOffset for partition $topicPartition " +
+  s"because the local replica logEndOffset $replicaEndOffset is 
smaller than leader's start offset.")

Review Comment:
   What do you think about the following wording to be more aligned with the 
other log message?
   
   ```
   Truncate fully and reset fetch offset for partition $topicPartition from 
$current to the current leader's start offset $new because the local replica's 
end offset is smaller than the current leader's start offsets.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-7802) Connection to Broker Disconnected Taking Down the Whole Cluster

2023-06-14 Thread Lubos Hozzan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lubos Hozzan updated KAFKA-7802:

Attachment: BytesInput.png

> Connection to Broker Disconnected Taking Down the Whole Cluster
> ---
>
> Key: KAFKA-7802
> URL: https://issues.apache.org/jira/browse/KAFKA-7802
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Candice Wan
>Priority: Critical
> Attachments: BytesInput.png, thread_dump.log
>
>
> We recently upgraded to 2.1.0. Since then, several times per day, we observe 
> some brokers were disconnected when other brokers were trying to fetch the 
> replicas. This issue took down the whole cluster, making all the producers 
> and consumers not able to publish or consume messages. It could be quickly 
> fixed by restarting the problematic broker.
> Here is an example of what we're seeing in the broker which was trying to 
> send fetch request to the problematic one:
> 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error sending fetch request (sessionId=937967566, epoch=1599941) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-1-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=1] Error sending fetch request (sessionId=506217047, epoch=1375749) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] WARN 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-11=(offset=421032847, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[178])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=937967566, 
> epoch=1599941))
>  java.io.IOException: Connection to 3 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:199)
>  at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  
>  
>  Below is the suspicious log of the problematic broker when the issue 
> happened:
> 2019-01-09 08:04:50.177 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member 
> consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 in group talon-instance1 has 
> failed, rem
>  oving it from the group
>  2019-01-09 08:04:50.177 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to 
> rebalance group talon-instance1 in state PreparingRebalance with old 
> generation 27
>  0 (__consumer_offsets-47) (reason: removing member 
> consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 on heartbeat expiration)
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member 
> consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b in group 
> Notifications.ASIA1546980352799 has failed, removing it from the group
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to 
> rebalance group Notifications.ASIA1546980352799 in state PreparingRebalance 
> with old generation 1 (__consumer_offsets-44) (reason: removing member 
> consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b on heartbeat expiration)
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Group 
> Notifications.ASIA1546980352799 with generation 2 is now empty 
> (__consumer_offsets-44)
>  2019-01-09 08:04:50.388 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member 
> 

[GitHub] [kafka] dajac commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-14 Thread via GitHub


dajac commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1229600983


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The group coordinator configurations.
+ */
+public class GroupCoordinatorConfig {
+public static class Builder {
+private int numThreads = 1;

Review Comment:
   I have played a bit around with this idea. It create a pretty big diff so I 
have decided to tackle this separately from this one. I filed 
https://issues.apache.org/jira/browse/KAFKA-15089 for this purpose.
   
   For this patch, I have reduced `GroupCoordinatorConfig` to a simple POJO for 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15089) Consolidate all the group coordinator configs

2023-06-14 Thread David Jacot (Jira)
David Jacot created KAFKA-15089:
---

 Summary: Consolidate all the group coordinator configs
 Key: KAFKA-15089
 URL: https://issues.apache.org/jira/browse/KAFKA-15089
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot


The group coordinator configurations are defined in KafkaConfig at the moment. 
As KafkaConfig is defined in the core module, we can't pass it to the new java 
modules to pass the configurations along.

A suggestion here is to centralize all the configurations of a module in the 
module itself similarly to what we have do for RemoteLogManagerConfig and 
RaftConfig. We also need a mechanism to add all the properties defined in the 
module to the KafkaConfig's ConfigDef.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-14 Thread via GitHub


hudeqi commented on code in PR #13719:
URL: https://github.com/apache/kafka/pull/13719#discussion_r1229575083


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -680,12 +680,15 @@ abstract class AbstractFetcherThread(name: String,
*/
   val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
   val leaderStartOffset = offsetAndEpoch.offset
-  warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
-s"leader's start offset $leaderStartOffset")
   val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
+  info(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to " +

Review Comment:
   Updated according to your comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-14 Thread via GitHub


hudeqi commented on code in PR #13719:
URL: https://github.com/apache/kafka/pull/13719#discussion_r1229573502


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -680,12 +680,15 @@ abstract class AbstractFetcherThread(name: String,
*/
   val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
   val leaderStartOffset = offsetAndEpoch.offset
-  warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
-s"leader's start offset $leaderStartOffset")
   val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
+  info(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to " +

Review Comment:
   > I see. In this case, would it make sense to move this log to the else 
branch of the if at L687? It is a bit weird to log this and then potentially 
the warning.
   > 
   > If we do this, we could also rewrite it as follow:
   > 
   > ```
   > Reset fetch offset for partition $topicPartition from $replicaEndOffset to 
the current local replica's end offset $offsetToFetch
   > ```
   > 
   > Would this make sense?
   
   Thanks, I think it makes more sense this way. @dajac 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-14 Thread via GitHub


dajac commented on code in PR #13719:
URL: https://github.com/apache/kafka/pull/13719#discussion_r1229555487


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -680,12 +680,15 @@ abstract class AbstractFetcherThread(name: String,
*/
   val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
   val leaderStartOffset = offsetAndEpoch.offset
-  warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
-s"leader's start offset $leaderStartOffset")
   val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
+  info(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to " +

Review Comment:
   I see. In this case, would it make sense to move this log to the else branch 
of the if at L687? It is a bit weird to log this and then potentially the 
warning.
   
   If we do this, we could also rewrite it as follow:
   ```
   Reset fetch offset for partition $topicPartition from $replicaEndOffset to 
the current local replica's end offset $offsetToFetch
   ```
   
   Would this make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13839: MINOR:Fill missing parameter annotations for some LogCleaner methods

2023-06-14 Thread via GitHub


hudeqi commented on PR #13839:
URL: https://github.com/apache/kafka/pull/13839#issuecomment-159368

   Hello, could you help to review this PR? @jlprat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13705: MINOR:Optimize the use of metrics in ReplicaManager and remove checks

2023-06-14 Thread via GitHub


hudeqi commented on PR #13705:
URL: https://github.com/apache/kafka/pull/13705#issuecomment-1591092269

   Hi, @divijvaidya,  it has been updated according to your comment, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs

2023-06-14 Thread via GitHub


showuon commented on code in PR #13650:
URL: https://github.com/apache/kafka/pull/13650#discussion_r1229514718


##
connect/mirror/README.md:
##
@@ -1,297 +0,0 @@
-
-# MirrorMaker 2.0
-
-MM2 leverages the Connect framework to replicate topics between Kafka
-clusters. MM2 includes several new features, including:
-
- - both topics and consumer groups are replicated
- - topic configuration and ACLs are replicated
- - cross-cluster offsets are synchronized
- - partitioning is preserved
-
-## Replication flows
-
-MM2 replicates topics and consumer groups from upstream source clusters
-to downstream target clusters. These directional flows are notated
-`A->B`.
-
-It's possible to create complex replication topologies based on these
-`source->target` flows, including:
-
- - *fan-out*, e.g. `K->A, K->B, K->C`
- - *aggregation*, e.g. `A->K, B->K, C->K`
- - *active/active*, e.g. `A->B, B->A`
-
-Each replication flow can be configured independently, e.g. to replicate
-specific topics or groups:
-
-A->B.topics = topic-1, topic-2
-A->B.groups = group-1, group-2
-
-By default, all topics and consumer groups are replicated (except
-excluded ones), across all enabled replication flows. Each
-replication flow must be explicitly enabled to begin replication:
-
-A->B.enabled = true
-B->A.enabled = true
-
-## Starting an MM2 process
-
-You can run any number of MM2 processes as needed. Any MM2 processes
-which are configured to replicate the same Kafka clusters will find each
-other, share configuration, load balance, etc.
-
-To start an MM2 process, first specify Kafka cluster information in a
-configuration file as follows:
-
-# mm2.properties
-clusters = us-west, us-east
-us-west.bootstrap.servers = host1:9092
-us-east.bootstrap.servers = host2:9092
-
-You can list any number of clusters this way.
-
-Optionally, you can override default MirrorMaker properties:
-
-topics = .*
-groups = group1, group2
-emit.checkpoints.interval.seconds = 10
-
-These will apply to all replication flows. You can also override default
-properties for specific clusters or replication flows:
-
-# configure a specific cluster
-us-west.offset.storage.topic = mm2-offsets
-
-# configure a specific source->target replication flow
-us-west->us-east.emit.heartbeats = false
-
-Next, enable individual replication flows as follows:
-
-us-west->us-east.enabled = true # disabled by default
-
-Finally, launch one or more MirrorMaker processes with the 
`connect-mirror-maker.sh`
-script:
-
-$ ./bin/connect-mirror-maker.sh mm2.properties
-
-## Multicluster environments
-
-MM2 supports replication between multiple Kafka clusters, whether in the
-same data center or across multiple data centers. A single MM2 cluster
-can span multiple data centers, but it is recommended to keep MM2's producers
-as close as possible to their target clusters. To do so, specify a subset
-of clusters for each MM2 node as follows:
-
-# in west DC:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
-
-This signals to the node that the given clusters are nearby, and prevents the
-node from sending records or configuration to clusters in other data centers.
-
-### Example
-
-Say there are three data centers (west, east, north) with two Kafka
-clusters in each data center (west-1, west-2 etc). We can configure MM2
-for active/active replication within each data center, as well as cross data
-center replication (XDCR) as follows:
-
-# mm2.properties
-clusters: west-1, west-2, east-1, east-2, north-1, north-2
-
-west-1.bootstrap.servers = ...
----%<---
-
-# active/active in west
-west-1->west-2.enabled = true
-west-2->west-1.enabled = true
-
-# active/active in east
-east-1->east-2.enabled = true
-east-2->east-1.enabled = true
-
-# active/active in north
-north-1->north-2.enabled = true
-north-2->north-1.enabled = true
-
-# XDCR via west-1, east-1, north-1
-west-1->east-1.enabled = true
-west-1->north-1.enabled = true
-east-1->west-1.enabled = true
-east-1->north-1.enabled = true
-north-1->west-1.enabled = true
-north-1->east-1.enabled = true
-
-Then, launch MM2 in each data center as follows:
-
-# in west:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
-
-# in east:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters east-1 east-2
-
-# in north:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters north-1 north-2
-
-With this configuration, records produced to any cluster will be replicated
-within the data center, as well as across to other data centers. By providing
-the `--clusters` parameter, we ensure that each node only produces records to
-nearby clusters.
-
-N.B. that the `--clusters` parameter is not technically required here. MM2 
will work fine without it; however, throughput may suffer from "producer lag" 
between

[jira] [Assigned] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-06-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-15083:
-

Assignee: Satish Duggana

> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Satish Duggana
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-14 Thread via GitHub


showuon commented on PR #13828:
URL: https://github.com/apache/kafka/pull/13828#issuecomment-1591076070

   > > I think we don't have this implemented. We should pass 
remote.log.metadata.* into RLMM based on KIP-405. Created 
[KAFKA-15083](https://issues.apache.org/jira/browse/KAFKA-15083) for this issue.
   > 
   > @showuon This is no more valid, KIP needs to be updated with the prefix 
based configs for RSM and RLMM. Will update the KIP with those details.
   
   Good to know, thanks Satish. I've assigned KAFKA-15083 to you. You can close 
it once you've updated the KIP. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes

2023-06-14 Thread via GitHub


vamossagar12 commented on PR #13853:
URL: https://github.com/apache/kafka/pull/13853#issuecomment-1591072071

   @daehokimm ,nice catch, thanks for the PR. LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] daehokimm opened a new pull request, #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes

2023-06-14 Thread via GitHub


daehokimm opened a new pull request, #13853:
URL: https://github.com/apache/kafka/pull/13853

   Currently, in the AbstractHerder class, the behavior of state changes for 
Connectors and Tasks is handled by implementing Listener using the 
ConnectorStatus and TaskStatus classes, which inherit from AbstractStatus. 
However, the code implementing the state change behavior in ConnectorStatus 
refers to and uses an inappropriate State enum class.
   
   Both ConnectorStatus and TaskStatus inherit and implement AbstractStatus, 
and as a result, they share the State class. However, there is a need to make 
modifications for clear referencing. We need to fix the implementation in 
ConnectorStatus to use the appropriate reference for state changes.
   
   This bug ticket will address the task of modifying the code to use the 
correct reference for Connector state changes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15088) Fixing Incorrect Reference Usage in Connector State Changes

2023-06-14 Thread Daeho Kim (Jira)
Daeho Kim created KAFKA-15088:
-

 Summary: Fixing Incorrect Reference Usage in Connector State 
Changes
 Key: KAFKA-15088
 URL: https://issues.apache.org/jira/browse/KAFKA-15088
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Daeho Kim


Currently, in the AbstractHerder class, the behavior of state changes for 
Connectors and Tasks is handled by implementing Listener using the 
ConnectorStatus and TaskStatus classes, which inherit from AbstractStatus. 
However, the code implementing the state change behavior in ConnectorStatus 
refers to and uses an inappropriate State enum class.

Both ConnectorStatus and TaskStatus inherit and implement AbstractStatus, and 
as a result, they share the State class. However, there is a need to make 
modifications for clear referencing. We need to fix the implementation in 
ConnectorStatus to use the appropriate reference for state changes.

This bug ticket will address the task of modifying the code to use the correct 
reference for Connector state changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] KarboniteKream commented on pull request #13762: MINOR: Do not print an empty line when no topics exist

2023-06-14 Thread via GitHub


KarboniteKream commented on PR #13762:
URL: https://github.com/apache/kafka/pull/13762#issuecomment-1591027643

   Sure, it's no problem for me to wait. I just wanted to confirm what you had 
in mind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13762: MINOR: Do not print an empty line when no topics exist

2023-06-14 Thread via GitHub


vamossagar12 commented on PR #13762:
URL: https://github.com/apache/kafka/pull/13762#issuecomment-1591024476

   > Sure, I understand the concerns.
   > 
   > > you might want to align your PR with that PR?
   > 
   > Do you mean waiting for that PR to get merged, then rebase my changes? Or 
add my change to that PR?
   
   You could wait for the PR to get merged if that's ok to you. It's your call 
TBH :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-14 Thread via GitHub


hudeqi commented on PR #13719:
URL: https://github.com/apache/kafka/pull/13719#issuecomment-1591024699

   Hi, is there any suggestion and must do for this PR now? @dajac @satishd 
@divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229446218


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public abstract class TimerTask implements Runnable {
+private volatile TimerTaskEntry timerTaskEntry;
+// timestamp in millisecond
+public final long delayMs;
+
+public TimerTask(long delayMs) {
+this.delayMs = delayMs;
+}
+
+public void cancel() {
+synchronized (this) {
+if (timerTaskEntry != null) timerTaskEntry.remove();
+timerTaskEntry = null;
+}
+}
+
+void setTimerTaskEntry(TimerTaskEntry entry) {
+synchronized (this) {
+// if this timerTask is already held by an existing timer task 
entry,
+// we will remove such an entry first.
+if (timerTaskEntry != null && !timerTaskEntry.equals(entry)) {

Review Comment:
   Fixed in last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics

2023-06-14 Thread via GitHub


hudeqi opened a new pull request, #13852:
URL: https://github.com/apache/kafka/pull/13852

   ### Activation
   As the config 'segment.bytes' for topics related MM2(such as 
offset.storage.topic, config.storage.topic,status.storage.topic), if following 
the default configuration of the broker or set it larger, then when the MM 
cluster runs many and complicated tasks, especially the log volume of the topic 
'offset.storage.topic' is very large, it will affect the restart speed of the 
MM workers.
   
   After investigation, the reason is that a consumer needs to be started to 
read the data of ‘offset.storage.topic’ at startup. Although this topic is set 
to compact, if the 'segment size' is set to a large value, such as the default 
value of 1G, then this topic may have tens of gigabytes of data that cannot be 
compacted and has to be read from the earliest (because the active segment 
cannot be cleaned), which will consume a lot of time (in our online 
environment, we found that this topic stores 13G of data, it took nearly half 
an hour for all the data to be consumed), which caused the worker to be unable 
to start and execute tasks for a long time.
   
   ### Solution
   The number of consumer threads can also be adjusted, but I think it may be 
easier to set a upper limit for the 'segment size', for example, refer to the 
default value of __consumer_offsets: 100MB


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] daehokimm closed pull request #9259: KAFKA-10466: Allow regex for MaskField SMT to replacement

2023-06-14 Thread via GitHub


daehokimm closed pull request #9259: KAFKA-10466: Allow regex for MaskField SMT 
to replacement
URL: https://github.com/apache/kafka/pull/9259


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229370444


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RocksDBTimeOrderedKeyValueBufferTest {
+public RocksDBTimeOrderedKeyValueBuffer buffer;
+@Mock
+public SerdeGetter serdeGetter;
+public InternalProcessorContext context;
+public StreamsMetricsImpl streamsMetrics;
+@Mock
+public Sensor sensor;
+public long offset;
+
+@Before
+public void setUp() {
+when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
+when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+final Metrics metrics = new Metrics();
+offset = 0;
+streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
+}
+
+public void createJoin(final Duration grace) {
+final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing",  100).get();
+buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, 
"testing");
+buffer.setSerdesIfNull(serdeGetter);
+store.init((StateStoreContext) context, store);
+buffer.init((StateStoreContext) context, store);
+}
+
+private void pipeRecord(final String key, final String value, final long 
time) {
+final Record record = new Record<>(key, value, time);
+context.setRecordContext(new ProcessorRecordContext(time, offset++, 0, 
"testing", new RecordHeaders()));
+buffer.put(time, record, context.recordContext());
+}
+
+@Test
+public void shouldAddAndEvictRecord() {
+createJoin(Duration.ZERO);
+final AtomicInteger count = new AtomicInteger(0);
+pipeRecord("1", "0", 0L);
+buffer.evictWhile(() -> buffer.numRecords() > 0, r -> 
count.getAndIncrement());
+assertThat(count.get(), equalTo(1));
+}

Review Comment:
   Could you please add verifications that `numRecords()`, `bufferSize()`, and 
`minTimestamp()` are updated correctly? 



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the 

[jira] [Created] (KAFKA-15087) Move InterBrokerSendThread to server-commons module

2023-06-14 Thread Dimitar Dimitrov (Jira)
Dimitar Dimitrov created KAFKA-15087:


 Summary: Move InterBrokerSendThread to server-commons module
 Key: KAFKA-15087
 URL: https://issues.apache.org/jira/browse/KAFKA-15087
 Project: Kafka
  Issue Type: Task
Reporter: Dimitar Dimitrov
Assignee: Dimitar Dimitrov


Similar to the move of {{ShutdownableThread}} done with KAFKA-14706.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-14 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1229411879


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case of ALOS, we wait for the same 
duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (isEOSEnabled) {
+secondaryWriteFuture.get();

Review Comment:
   While in the comments I have mentioned the reasoning of using an explicit 
`get()` call here, one thing to note is that I couldn't test the scenario where 
the secondary store write fails and the doFlush callback is able to see the 
error. This is because the `MockProducer#send` method throws any exception that 
you pass on to it w/o being able to create/execute the callback for it. This 
makes the `set` method itself to throw the error which is not what's going to 
happen from what I have understood.
   
   I could have ideally extended the send() in `MockProducer` to handle the 
case properly, but I didn't want to make any assumptions around it. If the 
reasoning stated above seems fine, I think it should be ok to do what I have 
done here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229407264


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public class TimerTaskEntry {

Review Comment:
   As explained in the other comment, this is not required any more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-14 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1229405980


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);
+
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+secondaryStore.set(values, (secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+}
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
-if (secondaryStore != null) {
+// Secondary store writes have already happened for tombstone 
records

Review Comment:
   Plz check my comment here: 
https://github.com/apache/kafka/pull/13801#discussion_r1229405593



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-14 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1229405593


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -302,7 +326,12 @@ public Future set(Map 
values, Callback callb
 }
 }
 try (LoggingContext context = loggingContext()) {
-callback.onCompletion(primaryWriteError, ignored);
+Throwable secondaryWriteError = 
secondaryStoreTombstoneWriteError.get();
+if (secondaryStore != null && containsTombstones && 
secondaryWriteError != null) {

Review Comment:
   That's true and I have tweaked the logic to wait for secondary store write 
in case of tombstone records before proceeding with primary store writes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-14 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1229404145


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test

Review Comment:
   I kept them here because eventually the error that should manifest in the 
callbacks should be via the `doFlush` method(which is what the actual 
`commitOffsets` method would be seeing. Let me know if that sounds ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-14 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1229402835


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);

Review Comment:
   @ashwinpankaj , I am thinking it should be ok to scan the map because it's 
not going to be huge. Moreover, the flag `containsTombstones` is used when 
writing to primary store as well to check if secondary store writes have 
already happened. WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-14 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1229399358


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 

Review Comment:
   Sure, will do once we are closer to merging this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >