[GitHub] [kafka] sudeshwasnik commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
sudeshwasnik commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1592302321 hi @sagarrao12 , small correction in my comment earlier - ``` Since assumption 1 is incorrect, we should change the test where it doesn't expect every record that decrements recordsToCommitLatch also must have been produced. ``` it seems that `recordsRemainingLatch` only tries to wait until X records have been returned by source-task to framework. Say MINIMUM_MESSAGES = 1000, but MESSAGES_PER_POLL is configured 100, so we need to wait until source-task delivers 1000 messages to framework. This helps in not including `production` time in `awaitCommits` assertion timeout (?) wdyt ? Also, the reason this PR passes the test now is because, it doesn't validate X records are present in topic `when` X countDown for `awaitCommits` is done. It'll wait `until` X records are produced into the topic -> by then there could've been many more `commitRecord`s. This assertion doesn't help now because connector is running continously, so there `WILL` be MINIMUM_MESSAGES produced eventually. `` assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(), sourceRecords.count() >= MINIMUM_MESSAGES); `` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13773: KAFKA-14712: Produce correct error msg with correct metadataversion
showuon commented on PR #13773: URL: https://github.com/apache/kafka/pull/13773#issuecomment-1592269594 @Owen-CH-Leung , thanks for the patch, could you add tests for this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions
showuon commented on code in PR #13665: URL: https://github.com/apache/kafka/pull/13665#discussion_r1230345113 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -127,6 +127,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // hold onto request for committed offset requests to enable async calls. private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; +// holds the offset metadata for assigned partitions to reduce remote calls thus speeding up fetching partition metadata +private final Map committedTopicPartitionOffsetsCache; Review Comment: nit: the comment above should mention this is the `committed offset metadata` ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -922,13 +922,9 @@ public void testCommitsFetchedDuringAssign() { // fetch offset for two topics Map offsets = new HashMap<>(); -offsets.put(tp0, offset1); -client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); -assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); - -offsets.remove(tp0); offsets.put(tp1, offset2); Review Comment: Could we add a comment above about why we only need to respond with `tp1, offset2`? Something about it's been cached in previous committed offset fetch. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); -Map offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); +OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); +Map offsets = singletonMap(t1p, offsetAndMetadata); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); +Map cache = coordinator.committedOffsetsCache(); +assertEquals(cache.size(), 1); +assertEquals(cache.get(t1p), offsetAndMetadata); +} + +@Test +public void testCommitOffsetMetadataSync() { Review Comment: Thanks for adding the sync test ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); -Map offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); +OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); +Map offsets = singletonMap(t1p, offsetAndMetadata); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); +Map cache = coordinator.committedOffsetsCache(); +assertEquals(cache.size(), 1); +assertEquals(cache.get(t1p), offsetAndMetadata); Review Comment: assertEquals method signature is `assertEquals(int expected, int actual)`. Putting the parameter in the correct order will output the reasonable error message if any. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); -Map offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); +OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); +Map offsets = singletonMap(t1p, offsetAndMetadata); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); +Map cache = coordinator.committedOffsetsCache(); +assertEquals(cache.size(), 1); +assertEquals(cache.get(t1p), offsetAndMetadata); Review Comment: Also, could we assert cache is empty before we `commitOffsetsAsync`? i.e. ``` assertTrue(cache.isEmpty()); coordinator.commitOffsetsAsync(...) ... ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final SortedSet revoke final long startMs = time.milliseconds(); listener.onPartitionsRevoked(revokedPartitions);
[jira] [Commented] (KAFKA-15085) Make Timer.java implement AutoCloseable
[ https://issues.apache.org/jira/browse/KAFKA-15085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732824#comment-17732824 ] Owen C.H. Leung commented on KAFKA-15085: - Hi [~divijvaidya] , can you assign this issue to me ? I'm eager to pick this up. Also, can I ask if you can help to review my PR which aims to solve KAFKA-14712 ? https://github.com/apache/kafka/pull/13773 > Make Timer.java implement AutoCloseable > --- > > Key: KAFKA-15085 > URL: https://issues.apache.org/jira/browse/KAFKA-15085 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Divij Vaidya >Priority: Minor > Labels: Newbie, newbie > > Many automatic bug finders will flag a warning if an object of class which is > marked as AutoCloseable but is not closed properly in the code. Hence, as a > best practice we should implement AutoCloseable for classes which require > resources to be released after shutdown. > Timer.java should implement AutoCloseable and ShutDown should be replaced > with close() method. > Note that this interface change does not require a KIP since Timer.java is > not a user facing public class. > This was discussed here: > [https://github.com/apache/kafka/pull/13820#discussion_r1222654614] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14709) Move content in connect/mirror/README.md to the docs
[ https://issues.apache.org/jira/browse/KAFKA-14709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14709. --- Fix Version/s: 3.6.0 Resolution: Fixed > Move content in connect/mirror/README.md to the docs > > > Key: KAFKA-14709 > URL: https://issues.apache.org/jira/browse/KAFKA-14709 > Project: Kafka > Issue Type: Improvement > Components: docs, mirrormaker >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.6.0 > > > We should move all the content in > https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md to the > relevant doc sections. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs
showuon merged PR #13650: URL: https://github.com/apache/kafka/pull/13650 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs
showuon commented on PR #13650: URL: https://github.com/apache/kafka/pull/13650#issuecomment-1592238006 Failed tests are unrelated: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs() Build / JDK 11 and Scala 2.13 / kafka.admin.DescribeConsumerGroupTest.testDescribeOffsetsOfExistingGroupWithNoMembers() Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster() Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testClose() Build / JDK 8 and Scala 2.12 / kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs() Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
showuon commented on code in PR #13782: URL: https://github.com/apache/kafka/pull/13782#discussion_r1230335416 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1007,6 +1007,30 @@ public static void flushDir(Path path) throws IOException { } } +/** + * Flushes dirty file to guarantee crash consistency. + * + * @throws IOException if flushing the file fails. + */ +public static void flushFile(Path path) throws IOException { +if (path != null) { +try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { +fileChannel.force(true); +} +} +} + +/** + * Flushes dirty file quietly, logs warning when exception happens. + */ +public static void flushFileQuietly(Path path, String name) { +try { +flushFile(path); +} catch (IOException e) { +log.warn("Failed to flush {} at path {}", name, path); Review Comment: You should still put `e` in the 3rd parameter, like this: `log.warn("Failed to flush {} at path {}", name, path, e);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230295895 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.InvalidFetchSizeException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.util.FutureUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentMatchers; + +import java.util.Collections; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.TestUtil.requestContext; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class GroupCoordinatorServiceTest { + +@SuppressWarnings("unchecked") +private CoordinatorRuntime mockRuntime() { +return (CoordinatorRuntime) mock(CoordinatorRuntime.class); +} + +private GroupCoordinatorConfig createConfig() { +return new GroupCoordinatorConfig( +1, +45, +5, +Integer.MAX_VALUE, +Collections.singletonList(new RangeAssignor()), +1000 +); +} + +@Test +public void testStartupShutdown() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); + +service.startup(() -> 1); +service.shutdown(); + +verify(runtime, times(1)).close(); +} + +@Test +public void testConsumerGroupHeartbeatWhenNotStarted() { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); + +ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData() +.setGroupId("foo"); + +assertFutureThrows( +service.consumerGroupHeartbeat( +requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), +request +), +CoordinatorNotAvailableException.class +); +} + +
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230294539 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.apache.kafka.coordinator.group.TestUtil.requestContext; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ReplicatedGroupCoordinatorTest { + +@Test +public void testConsumerGroupHeartbeat() { +GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); +ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator( +groupMetadataManager +); + +RequestContext context = requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT); +ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData(); +CoordinatorResult result = new CoordinatorResult<>( +Collections.emptyList(), +new ConsumerGroupHeartbeatResponseData() +); + +when(coordinator.consumerGroupHeartbeat( +context, +request +)).thenReturn(result); + +assertEquals(result, coordinator.consumerGroupHeartbeat(context, request)); Review Comment: Is this just a test that we don't throw errors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230293541 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.InvalidFetchSizeException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.util.FutureUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentMatchers; + +import java.util.Collections; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.TestUtil.requestContext; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class GroupCoordinatorServiceTest { + +@SuppressWarnings("unchecked") +private CoordinatorRuntime mockRuntime() { +return (CoordinatorRuntime) mock(CoordinatorRuntime.class); +} + +private GroupCoordinatorConfig createConfig() { +return new GroupCoordinatorConfig( +1, +45, +5, +Integer.MAX_VALUE, +Collections.singletonList(new RangeAssignor()), +1000 +); +} + +@Test +public void testStartupShutdown() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); + +service.startup(() -> 1); +service.shutdown(); + +verify(runtime, times(1)).close(); +} + +@Test +public void testConsumerGroupHeartbeatWhenNotStarted() { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); + +ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData() +.setGroupId("foo"); + +assertFutureThrows( +service.consumerGroupHeartbeat( +requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), +request +), +CoordinatorNotAvailableException.class +); +} + +
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230281526 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidFetchSizeException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsRequestData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetDeleteRequestData; +import org.apache.kafka.common.message.OffsetDeleteResponseData; +import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.message.TxnOffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier; +import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; +import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.util.FutureUtils; +import org.slf4j.Logger; + +import java.util.List; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +/** + * The group coordinator service. + */ +public class GroupCoordinatorService implements GroupCoordinator { + +public static class Builder { +private final int nodeId; +private final GroupCoordinatorConfig config; +private PartitionWriter writer; +private CoordinatorLoader loader; + +public Builder( +int nodeId, +GroupCoordinatorConfig config +) { +this.nodeId = nodeId; +this.config = config; +} + +public Builder withWriter(PartitionWriter writer) { +this.writer =
[GitHub] [kafka] wcarlson5 commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)
wcarlson5 commented on PR #13855: URL: https://github.com/apache/kafka/pull/13855#issuecomment-1592137217 @cadonna could you give this a look? @vcrfxia has looked at [this](https://github.com/wcarlson5/kafka/pull/1) and there is a check of the table's history retention that I am missing but other than that it should be good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230246630 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidFetchSizeException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsRequestData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetDeleteRequestData; +import org.apache.kafka.common.message.OffsetDeleteResponseData; +import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.message.TxnOffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier; +import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; +import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.util.FutureUtils; +import org.slf4j.Logger; + +import java.util.List; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +/** + * The group coordinator service. + */ +public class GroupCoordinatorService implements GroupCoordinator { + +public static class Builder { +private final int nodeId; +private final GroupCoordinatorConfig config; +private PartitionWriter writer; +private CoordinatorLoader loader; + +public Builder( +int nodeId, +GroupCoordinatorConfig config +) { +this.nodeId = nodeId; +this.config = config; +} + +public Builder withWriter(PartitionWriter writer) { +this.writer =
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230243216 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidFetchSizeException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsRequestData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetDeleteRequestData; +import org.apache.kafka.common.message.OffsetDeleteResponseData; +import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.message.TxnOffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier; +import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; +import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.util.FutureUtils; +import org.slf4j.Logger; + +import java.util.List; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +/** + * The group coordinator service. + */ +public class GroupCoordinatorService implements GroupCoordinator { + +public static class Builder { +private final int nodeId; +private final GroupCoordinatorConfig config; +private PartitionWriter writer; +private CoordinatorLoader loader; + +public Builder( +int nodeId, +GroupCoordinatorConfig config +) { +this.nodeId = nodeId; +this.config = config; +} + +public Builder withWriter(PartitionWriter writer) { +this.writer =
[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface
cmccabe commented on code in PR #13826: URL: https://github.com/apache/kafka/pull/13826#discussion_r1230241309 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel, if (apiVersionRequest.hasUnsupportedRequestVersion) { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) } else if (!apiVersionRequest.isValid) { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)) - CompletableFuture.completedFuture[Unit](()) } else { - val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty()) - controller.finalizedFeatures(context).handle { (result, exception) => -requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - if (exception != null) { -apiVersionRequest.getErrorResponse(requestThrottleMs, exception) - } else { -apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch()) - } -}) - } + requestHelper.sendResponseMaybeThrottle(request, +requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs)) Review Comment: I will change this not to start the listener until we've caught up to the local HWM. That will avoid this and some other gotchas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working
[ https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732778#comment-17732778 ] Ravindranath Kakarla commented on KAFKA-13988: -- Does the issue have anything to do with source cluster being old version (0.10)? Did someone face this issue with latest versions of Kafka? > Mirrormaker 2 auto.offset.reset=latest not working > -- > > Key: KAFKA-13988 > URL: https://issues.apache.org/jira/browse/KAFKA-13988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 3.2.0 > Environment: Source Kafka cluster running on Ubuntu 20 > Source Kafka cluster Kafka v0.10 > Target Kafka cluster running in AWS MSK > Target Kafka cluster Kafka v2.6.2 > Mirrormaker version 3.2.0 running on Ubuntu 20. >Reporter: Daniel Florek >Assignee: Justinwins >Priority: Major > Fix For: 3.2.0 > > > Hi. > I have problem setting up mirroring with MM2 from latest offset between 2 > clusters. In logs I can see that Consumer that is consuming topics has > auto.offset.reset property set to latest. But still topics are read from > offset 0. I am using following configuration: > > {code:java} > clusters = A, B > A.bootstrap.servers = broker-01A:9092 > B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092 > replication.policy.class = > org.apache.kafka.connect.mirror.IdentityReplicationPolicy > #Enable replication between clusters and define topics which should be > replicated > A->B.enabled = true > A->B.topics = .* > A->B.replication.factor=3 > A->B.emit.heartbeats.enabled = true > A->B.emit.checkpoints.enabled = true > auto.offset.reset=latest > consumer.auto.offset.reset=latest > A.consumer.auto.offset.reset=latest > B.consumer.auto.offset.reset=latest > refresh.topics.enabled=true > heartbeats.topic.replication.factor=1 > checkpoints.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > config.storage.replication.factor = 1 > offset.storage.replication.factor = 1 > status.storage.replication.factor = 1 {code} > I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker > running on EC2 instance in AWS (quite an old version I think 0.10). Target > kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). > Could you point me what I am doing wrong? Or is this possibly a bug? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230229603 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidFetchSizeException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsRequestData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetDeleteRequestData; +import org.apache.kafka.common.message.OffsetDeleteResponseData; +import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.message.TxnOffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier; +import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; +import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.util.FutureUtils; +import org.slf4j.Logger; + +import java.util.List; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +/** + * The group coordinator service. + */ +public class GroupCoordinatorService implements GroupCoordinator { + +public static class Builder { +private final int nodeId; +private final GroupCoordinatorConfig config; +private PartitionWriter writer; +private CoordinatorLoader loader; + +public Builder( +int nodeId, +GroupCoordinatorConfig config +) { +this.nodeId = nodeId; +this.config = config; +} + +public Builder withWriter(PartitionWriter writer) { +this.writer =
[GitHub] [kafka] dimitarndimitrov opened a new pull request, #13856: [DRAFT] KAFKA-15087 Move/rewrite InterBrokerSendThread to server-commons
dimitarndimitrov opened a new pull request, #13856: URL: https://github.com/apache/kafka/pull/13856 The Java rewrite is kept relatively close to the Scala original to minimize potential newly introduced bugs and to make reviewing simpler. The following details might be of note: - The `Logging` trait moved to `InterBrokerSendThread` with the rewrite of `ShutdownableThread` has been similarly moved to any subclasses that currently use it. `InterBrokerSendThread`'s own logging has been made to use `ShutdownableThread`'s logger which mimics the prefix/log identifier that the trait provided. - The case `RequestAndCompletionHandler` class has been made a separate POJO class and the internal-use `UnsentRequests` class has been kept as a static nested class. - The relatively commonly used but internal (not part of the public API) clients classes that `InterBrokerSendThread` relies on have been allowlisted in the server-common import control. - The accompanying test class has also been moved and rewritten with one new test added and most of the pre-existing tests made stricter. - The main abstract method of `InterBrokerSendThread` - `generateRequests()` - now returns a Java collection so in the initial draft `asScala` has been generously sprinkled around its usages. I'll further look into whether there's something more preferable for that. The local test run after the changes ended with _7661 tests completed, 2 failed, 3 skipped_ and the failures being seemingly unrelated and not in server-common or core. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230195090 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; + +import java.util.Collections; +import java.util.List; + +/** + * The group coordinator configurations. + */ +public class GroupCoordinatorConfig { +public static class Builder { +private int numThreads = 1; Review Comment: I was also wondering about how config defs worked with documentation. Thanks for filling this JIRA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
jolshan commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1230193218 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.runtime.Coordinator; +import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; + +/** + * The group coordinator replicated state machine that manages the metadata of all generic and + * consumer groups. It holds the hard and the soft state of the groups. This class has two kinds + * of methods: + * 1) The request handlers which handle the requests and generate a response and records to + *mutate the hard state. Those records will be written by the runtime and applied to the + *hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + *handling as well as during the initial loading of the records from the partitions. + */ +public class ReplicatedGroupCoordinator implements Coordinator { Review Comment: I guess it depends on what the other implementations of coordinator will be -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request, #13855: Grace period added
wcarlson5 opened a new pull request, #13855: URL: https://github.com/apache/kafka/pull/13855 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15092) KafkaClusterTestKit in test jar depends on MockFaultHandler
Gary Russell created KAFKA-15092: Summary: KafkaClusterTestKit in test jar depends on MockFaultHandler Key: KAFKA-15092 URL: https://issues.apache.org/jira/browse/KAFKA-15092 Project: Kafka Issue Type: Bug Affects Versions: 3.5.0 Reporter: Gary Russell {noformat} java.lang.NoClassDefFoundError: org/apache/kafka/server/fault/MockFaultHandler at kafka.testkit.KafkaClusterTestKit$SimpleFaultHandlerFactory.(KafkaClusterTestKit.java:119) at kafka.testkit.KafkaClusterTestKit$Builder.(KafkaClusterTestKit.java:143) {noformat} MockFaultHandler is missing from the test jar. This PR https://github.com/apache/kafka/pull/13375/files seems to work around it by adding the {code}server-common sourcesets.test.output{code} to the class path. The class needs to be available for third parties to create an embedded KRaft broker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] wcarlson5 merged pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 merged PR #13756: URL: https://github.com/apache/kafka/pull/13756 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on PR #13756: URL: https://github.com/apache/kafka/pull/13756#issuecomment-1591908695 https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13756/19/tests/ Test failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13724: MINOR: more KRaft Metadata Image tests
mumrah commented on code in PR #13724: URL: https://github.com/apache/kafka/pull/13724#discussion_r1230101055 ## metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java: ## @@ -95,6 +95,60 @@ public static void replayOne( replayAll(target, Collections.singletonList(recordAndVersion)); } +public interface TestThroughAllIntermediateImagesLeadingToFinalImageHelper { Review Comment: Could we make this a concrete class? We could take in a supplier for an empty image and a function for creating a delta from an image. Also, I think we could add type parameters to avoid some of the casting. E.g., ```java class HelperThing { HelperThing(Supplier emptyImageSupplier, Function imageToDelta) void test(I image, List fromRecords); } ``` then use it like ```java HelperThing helper = new HelperThing(() -> TopicsImage.EMPTY, image -> new TopicsDelta(image)); helper.test(image, fromRecords); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #6420: KAFKA-3881: use plain topic tag in Fetcher metrics
kirktrue commented on PR #6420: URL: https://github.com/apache/kafka/pull/6420#issuecomment-1591846803 @dbrinegar @tombentley @guozhangwang @junrao Can this PR be resurrected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3881) Remove the replacing logic from "." to "_" in Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-3881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732694#comment-17732694 ] Kirk True commented on KAFKA-3881: -- Two questions: # Is there any reason the PR wasn't merged? # Will making this change now break users who've coded this discrepancy into their applications/tools? > Remove the replacing logic from "." to "_" in Fetcher > - > > Key: KAFKA-3881 > URL: https://issues.apache.org/jira/browse/KAFKA-3881 > Project: Kafka > Issue Type: Bug > Components: consumer, metrics >Reporter: Guozhang Wang >Assignee: Tom Bentley >Priority: Major > Labels: newbie, patch-available > > The logic of replacing "." to "_" in metrics names / tags was originally > introduced in the core package's metrics since Graphite treats "." as > hierarchy separators (see KAFKA-1902); for the client metrics, it is supposed > that the GraphiteReported should take care of this itself rather than letting > Kafka metrics to special handle for it. In addition, right now only consumer > Fetcher had replace, but producer Sender does not have it actually. > So we should consider removing this logic in the consumer Fetcher's metrics > package. NOTE that this is a public API backward incompatible change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-3881) Remove the replacing logic from "." to "_" in Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-3881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732693#comment-17732693 ] Kirk True commented on KAFKA-3881: -- We have a user running into this issue. It appears that only the fetcher-related metrics are the metrics that preemptively convert {{.}} to {{{}_{}}}. My initial research lead me down the path to the fetcher metric names, which looks to jive with the direction of [~tombentley] and [~dbrinegar]'s solutions in their respective pull requests. > Remove the replacing logic from "." to "_" in Fetcher > - > > Key: KAFKA-3881 > URL: https://issues.apache.org/jira/browse/KAFKA-3881 > Project: Kafka > Issue Type: Bug > Components: consumer, metrics >Reporter: Guozhang Wang >Assignee: Tom Bentley >Priority: Major > Labels: newbie, patch-available > > The logic of replacing "." to "_" in metrics names / tags was originally > introduced in the core package's metrics since Graphite treats "." as > hierarchy separators (see KAFKA-1902); for the client metrics, it is supposed > that the GraphiteReported should take care of this itself rather than letting > Kafka metrics to special handle for it. In addition, right now only consumer > Fetcher had replace, but producer Sender does not have it actually. > So we should consider removing this logic in the consumer Fetcher's metrics > package. NOTE that this is a public API backward incompatible change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] novosibman commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
novosibman commented on code in PR #13782: URL: https://github.com/apache/kafka/pull/13782#discussion_r1230002264 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -976,6 +976,30 @@ public static void flushDir(Path path) throws IOException { } } +/** + * Flushes dirty file to guarantee crash consistency. + * + * @throws IOException if flushing the file fails. + */ +public static void flushFile(Path path) throws IOException { +if (path != null) { +try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { +fileChannel.force(true); +} +} +} + +/** + * Flushes dirty file quietly, logs warning when exception happens. + */ +public static void flushFileQuietly(Path path, String name) { +try { +flushFile(path); +} catch (IOException e) { +log.warn("Failed to flush {} at path {} with exception {}", name, path, e); Review Comment: Third parameter removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732666#comment-17732666 ] Yash Mayya commented on KAFKA-15091: {quote}{{it does not have anything to do with the offsets returned from {{SourceTask:poll}} and is instead just a general, periodically-invoked hook to let the task know that an offset commit has taken place (but with no guarantees as to which offsets have been committed and which ones correspond to still-in-flight records).}} {quote} The SourceTask::commit method doesn't seem like a particularly useful hook in its current shape; I wonder whether we should consider deprecating it...? > Javadocs for SourceTask::commit are incorrect > - > > Key: KAFKA-15091 > URL: https://issues.apache.org/jira/browse/KAFKA-15091 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > > The Javadocs for {{SourceTask::commit}} state that the method should: > {quote}Commit the offsets, up to the offsets that have been returned by > [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. > {quote} > However, this is obviously incorrect given how the Connect runtime (when not > configured with exactly-once support for source connectors) performs polling > and offset commits on separate threads. There's also some extensive > discussion on the semantics of that method in KAFKA-5716 where it's made > clear that altering the behavior of the runtime to align with the documented > semantics of that method is not a viable option. > We should update the Javadocs for this method to state that it does not have > anything to do with the offsets returned from {{SourceTask:poll}} and is > instead just a general, periodically-invoked hook to let the task know that > an offset commit has taken place (but with no guarantees as to which offsets > have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits
gharris1727 commented on PR #13838: URL: https://github.com/apache/kafka/pull/13838#issuecomment-1591734588 Yep, I was able to reproduce some flaky failures in testReplication, and they appear to have a lot of failures in CI. I'll debug those further and remediate them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #13839: MINOR:Fill missing parameter annotations for some LogCleaner methods
jlprat commented on PR #13839: URL: https://github.com/apache/kafka/pull/13839#issuecomment-1591700252 Thanks for the PR @hudeqi. I think it's always good to improve JavaDocs or ScalaDocs. I was looking at the `LogCleaner.scala` file and I saw there are plenty of methods that are public which have only a very general documentation and they don't have any parameter annotation with documentation. For example `abortCleaning` in line 220 and some of the following methods. For the sake of completion, would you be up to adding the missing annotations to the methods that are public? Extra mile for all the ones that are package-log-protected (`private[log]`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #13724: MINOR: more KRaft Metadata Image tests
rondagostino commented on PR #13724: URL: https://github.com/apache/kafka/pull/13724#issuecomment-1591687192 4 test failures on latest build are unrelated. This PR just changes tests, and all of the affected tests passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229930816 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} Review Comment: I need to swap those lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at:
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229930160 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction<>(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bufferSize - computeRecordSize(keyValue.key, bufferValue); +} +if (numRecords == 0) { +
[GitHub] [kafka] gharris1727 commented on pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits
gharris1727 commented on PR #13838: URL: https://github.com/apache/kafka/pull/13838#issuecomment-1591647109 > This change seems to help with MirrorCheckpoint failing to start. I now get a lot of failures due to MirrorHeartbeat. I'll try to reproduce this locally, I hadn't noticed that. > Looking at the logs, it seems a significant amount of time is spent failing to load faulty test plugins, for example: This log message is from #13182 and the faulty plugins started appearing after #13467. I think that the 14 seconds might be a little exaggerated as the scanning is re-done for each connect worker, and this test instantiates 6 workers across 2 clusters. I'll look into the performance impact of the new plugin instantiations to see if it's more than I expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits
gharris1727 commented on code in PR #13838: URL: https://github.com/apache/kafka/pull/13838#discussion_r1229900453 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -1183,17 +1187,31 @@ private void createTopics() { } } -/* - * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly +/** + * Commit offset 0 for all partitions of test-topic-1 for the specified consumer groups on primary and backup clusters. + * This is done to force the MirrorCheckpointConnector to start at a task which checkpoints this group. + * Must be called before {@link #waitUntilMirrorMakerIsRunning} to prevent that method from timing out. */ -protected void warmUpConsumer(Map consumerProps) { -try (Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { -dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); -dummyConsumer.commitSync(); -} -try (Consumer dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { -dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); -dummyConsumer.commitSync(); +protected void prepareConsumerGroup(Map consumerProps) { +prepareConsumerGroup(primary.kafka(), consumerProps, "test-topic-1"); +prepareConsumerGroup(backup.kafka(), consumerProps, "test-topic-1"); +} + +private void prepareConsumerGroup(EmbeddedKafkaCluster cluster, Map consumerProps, String topic) { +try (Admin client = cluster.createAdminClient()) { +Map topics = client.describeTopics(Collections.singleton(topic)) +.allTopicNames() +.get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS); +Map collect = topics.get(topic) +.partitions() +.stream() +.collect(Collectors.toMap( +tpi -> new TopicPartition(topic, tpi.partition()), +ignored -> new OffsetAndMetadata(0L))); +AlterConsumerGroupOffsetsResult alterResult = client.alterConsumerGroupOffsets((String) consumerProps.get("group.id"), collect); +alterResult.all().get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS); +} catch (ExecutionException | InterruptedException | TimeoutException e) { Review Comment: I was just avoiding changing the throws signatures, but I see now that it's just 3 sites. I'll change this to avoid wrapping the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
vamossagar12 commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1591623006 Thanks @sudeshwasnik , for the assessment. Continuing the line of thought, if you check the `MonitorableSourceConnector#poll` method, it appears to me that we decrement the `recordsRemainingLatch` latch equal to the number of records in the batch irrespective of the fact that whether the records are going to be part of a transaction that can be aborted here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java#L215-L216 Because of this, the `awaitRecords` method is able to have the latch decremented enough number of times to be counted down to 0 within the timeout. However, as you also rightly pointed out, the `recordToCommitLatch` is decremented for either cases of abort or commit of the txn which means `awaitCommits` also passes within the timeout. Could this make the test flaky since the bound on the `recordsRemainingLatch` doesn't seem to be strong enough to ensure that the actual number of records in the topic equals that? Would it help if we decrement the `recordsRemainingLatch` for cases of committed transactions only? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
Chris Egerton created KAFKA-15091: - Summary: Javadocs for SourceTask::commit are incorrect Key: KAFKA-15091 URL: https://issues.apache.org/jira/browse/KAFKA-15091 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Chris Egerton The Javadocs for {{SourceTask::commit}} state that the method should: {quote}Commit the offsets, up to the offsets that have been returned by [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. {quote} However, this is obviously incorrect given how the Connect runtime (when not configured with exactly-once support for source connectors) performs polling and offset commits on separate threads. There's also some extensive discussion on the semantics of that method in KAFKA-5716 where it's made clear that altering the behavior of the runtime to align with the documented semantics of that method is not a viable option. We should update the Javadocs for this method to state that it does not have anything to do with the offsets returned from {{SourceTask:poll}} and is instead just a general, periodically-invoked hook to let the task know that an offset commit has taken place (but with no guarantees as to which offsets have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya closed pull request #13778: Feature/shuai add comment
divijvaidya closed pull request #13778: Feature/shuai add comment URL: https://github.com/apache/kafka/pull/13778 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13778: Feature/shuai add comment
divijvaidya commented on PR #13778: URL: https://github.com/apache/kafka/pull/13778#issuecomment-1591600444 I am closing this PR. Please refer to the comment above for the reason - https://github.com/apache/kafka/pull/13778#issuecomment-1567994161 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac merged PR #13820: URL: https://github.com/apache/kafka/pull/13820 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
cadonna commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229867769 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class RocksDBTimeOrderedKeyValueBufferTest { +public RocksDBTimeOrderedKeyValueBuffer buffer; +@Mock +public SerdeGetter serdeGetter; +public InternalProcessorContext context; +public StreamsMetricsImpl streamsMetrics; +@Mock +public Sensor sensor; +public long offset; + +@Before +public void setUp() { +when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); +when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); +final Metrics metrics = new Metrics(); +offset = 0; +streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); +context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); +} + +private void createBuffer(final Duration grace) { +final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing", 100).get(); +buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, "testing"); +buffer.setSerdesIfNull(serdeGetter); +buffer.init((StateStoreContext) context, store); +} + +private void pipeRecord(final String key, final String value, final long time) { +final Record record = new Record<>(key, value, time); +context.setRecordContext(new ProcessorRecordContext(time, offset++, 0, "testing", new RecordHeaders())); +buffer.put(time, record, context.recordContext()); +} + +@Test +public void shouldPutInBufferAndUpdateFields() { +createBuffer(Duration.ofMinutes(1)); +assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0); +pipeRecord("1", "0", 0L); +assertNumSizeAndTimestamp(buffer, 1, 0, 42); +pipeRecord("3", "0", 2L); +assertNumSizeAndTimestamp(buffer, 2, 0, 84); + Review Comment: ```suggestion ``` ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867810 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position } + /** + * Close should synchronously cleanup the resources used by this cache. + * This index is closed when [[RemoteLogManager]] is closed. + */ def close(): Unit = { -closed = true -cleanerThread.shutdown() -// Close all the opened indexes. -lock synchronized { - entries.values().stream().forEach(entry => entry.close()) +// make close idempotent +if (!closed.getAndSet(true)) { + // Initiate shutdown for cleaning thread + val shutdownRequired = cleanerThread.initiateShutdown() + // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk. + internalCache.asMap().forEach((_, entry) => entry.close()) + // Perform any pending activities required by the cache for cleanup + internalCache.cleanUp() Review Comment: Thank you for jumping in this review and providing your expert opinion on this pr. Appreciate it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867810 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position } + /** + * Close should synchronously cleanup the resources used by this cache. + * This index is closed when [[RemoteLogManager]] is closed. + */ def close(): Unit = { -closed = true -cleanerThread.shutdown() -// Close all the opened indexes. -lock synchronized { - entries.values().stream().forEach(entry => entry.close()) +// make close idempotent +if (!closed.getAndSet(true)) { + // Initiate shutdown for cleaning thread + val shutdownRequired = cleanerThread.initiateShutdown() + // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk. + internalCache.asMap().forEach((_, entry) => entry.close()) + // Perform any pending activities required by the cache for cleanup + internalCache.cleanUp() Review Comment: Thank you for jumping in this review and providing your expert opinion on this cache. Appreciate it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
ben-manes commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867001 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position } + /** + * Close should synchronously cleanup the resources used by this cache. + * This index is closed when [[RemoteLogManager]] is closed. + */ def close(): Unit = { -closed = true -cleanerThread.shutdown() -// Close all the opened indexes. -lock synchronized { - entries.values().stream().forEach(entry => entry.close()) +// make close idempotent +if (!closed.getAndSet(true)) { + // Initiate shutdown for cleaning thread + val shutdownRequired = cleanerThread.initiateShutdown() + // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk. + internalCache.asMap().forEach((_, entry) => entry.close()) + // Perform any pending activities required by the cache for cleanup + internalCache.cleanUp() Review Comment: Yep, it is just a fancy in-memory hash map. Caffeine doesn’t create threads, files, whatever. The cleanUp() method simply runs the maintenance work like discarding expired or weak/soft collected entries, which might occur before it realizes otherwise. It is fully gc’able. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ben-manes commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
ben-manes commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229867001 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position } + /** + * Close should synchronously cleanup the resources used by this cache. + * This index is closed when [[RemoteLogManager]] is closed. + */ def close(): Unit = { -closed = true -cleanerThread.shutdown() -// Close all the opened indexes. -lock synchronized { - entries.values().stream().forEach(entry => entry.close()) +// make close idempotent +if (!closed.getAndSet(true)) { + // Initiate shutdown for cleaning thread + val shutdownRequired = cleanerThread.initiateShutdown() + // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk. + internalCache.asMap().forEach((_, entry) => entry.close()) + // Perform any pending activities required by the cache for cleanup + internalCache.cleanUp() Review Comment: Yep, it is just a fancy in-memory hash map. Caffeine doesn’t create threads, files, whatever. The cleanUp() method simply runs the maintenance work like discarding expired or weak/soft collected entries, which might occur before it realizes otherwise. It if fully gc’able. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1229862041 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -266,13 +309,21 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position } + /** + * Close should synchronously cleanup the resources used by this cache. + * This index is closed when [[RemoteLogManager]] is closed. + */ def close(): Unit = { -closed = true -cleanerThread.shutdown() -// Close all the opened indexes. -lock synchronized { - entries.values().stream().forEach(entry => entry.close()) +// make close idempotent +if (!closed.getAndSet(true)) { + // Initiate shutdown for cleaning thread + val shutdownRequired = cleanerThread.initiateShutdown() + // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk. + internalCache.asMap().forEach((_, entry) => entry.close()) + // Perform any pending activities required by the cache for cleanup + internalCache.cleanUp() Review Comment: Great call out. I did not realise that. I do not want removal listener to execute on shutdown. I will not do anything at all here and just let GC clean it up. Am I correct in assuming that no specific "cleanup" is necessary to release the resources acquired by the cache? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15090) Source tasks are no longer stopped on a separate thread
Chris Egerton created KAFKA-15090: - Summary: Source tasks are no longer stopped on a separate thread Key: KAFKA-15090 URL: https://issues.apache.org/jira/browse/KAFKA-15090 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 3.0.1, 3.0.0, 3.1.0, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3, 3.6.0, 3.5.1 Reporter: Chris Egerton Assignee: Chris Egerton Before [https://github.com/apache/kafka/pull/9669,] in distributed mode, the {{SourceTask::stop}} method would be invoked on the herder tick thread, which is a separate thread from the dedicated thread which was responsible for polling data from the task and producing it to Kafka. This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state: {quote}The task will be stopped on a separate thread, and when that happens this method is expected to unblock, quickly finish up any remaining processing, and return. {quote} However, it came with the downside that the herder's tick thread would be blocked until the invocation of {{SourceTask::stop}} completed, which could result in major parts of the worker's REST API becoming unavailable and even the worker falling out of the cluster. As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the dedicated thread for the task (i.e., the one responsible for polling data from it and producing that data to Kafka). This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and may have broken connectors that block during {{poll}} with the expectation that {{stop}} can and will be invoked concurrently as a signal that any ongoing polls should be interrupted immediately. Although reverting the fix is likely not a viable option (blocking the herder thread on interactions with user-written plugins is high-risk and we have tried to eliminate all instances of this where feasible), we may try to restore the expected contract by spinning up a separate thread exclusively for invoking {{SourceTask::stop}} separately from the dedicated thread for the task and the herder's thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #13705: MINOR:Refactor the metric names into constants in ReplicaManager
divijvaidya commented on PR #13705: URL: https://github.com/apache/kafka/pull/13705#issuecomment-1591550112 I triggered the CI tests again since last run wasn't complete. Will merge this in after they are successful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229836485 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction<>(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bufferSize - computeRecordSize(keyValue.key, bufferValue); +} +if (numRecords == 0) { +
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229836485 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +while (iterator.hasNext() && predicate.get()) { +keyValue = iterator.next(); + +final BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +final K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); +minTimestamp = bufferValue.context().timestamp(); + +final V value = valueSerde.deserializer().deserialize(topic, bufferValue.newValue()); + +if (bufferValue.context().timestamp() < minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} + +callback.accept(new Eviction<>(key, value, bufferValue.context())); + +wrapped().remove(keyValue.key); +numRecords--; +bufferSize = bufferSize - computeRecordSize(keyValue.key, bufferValue); +} +if (numRecords == 0) { +
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229824269 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { Review Comment: I think so but I can't prove it. If not we might have to deal with slight missordering until we can find a solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229732758 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +public class RocksDbTimeOrderedKeyValueBytesStoreSupplier { Review Comment: oops I changed RocksDBKeyValueBytesStoreSupplier Instead, got this one as well ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private Serde valueSerde; +private final String topic; +private int seqnum; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +seqnum = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); Review Comment: Well my thought was that we already have the store to be initialized from the constructor so we shouldn't risk a different store being passed in. We could use the root I suppose but I see no reason too. ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF
[GitHub] [kafka] dajac commented on pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on PR #13820: URL: https://github.com/apache/kafka/pull/13820#issuecomment-1591401484 Failed tests seem unrelated: ``` Build / JDK 11 and Scala 2.13 / testReplication() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 1m 45s Build / JDK 11 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 1m 2s Build / JDK 11 and Scala 2.13 / testRestartReplication() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 3m 8s Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest 1m 29s Build / JDK 11 and Scala 2.13 / testDescribeAtMinIsrPartitions(String).quorum=zk – kafka.admin.TopicCommandIntegrationTest 6s Build / JDK 11 and Scala 2.13 / [1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 14s Build / JDK 8 and Scala 2.12 / testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest 1m 20s Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 15s Build / JDK 17 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 13s Build / JDK 17 and Scala 2.13 / shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2] – org.apache.kafka.streams.integration.EosIntegrationTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13854: MINOR: rat should depend on processMessages task
dajac opened a new pull request, #13854: URL: https://github.com/apache/kafka/pull/13854 This fix the following issue that we occasionally see in [builds](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13848/4/pipeline/13/). ``` [2023-06-14T11:41:50.769Z] * What went wrong: [2023-06-14T11:41:50.769Z] A problem was found with the configuration of task ':rat' (type 'RatTask'). [2023-06-14T11:41:50.769Z] - Gradle detected a problem with the following location: '/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13848'. [2023-06-14T11:41:50.769Z] [2023-06-14T11:41:50.769Z] Reason: Task ':rat' uses this output of task ':clients:processTestMessages' without declaring an explicit or implicit dependency. This can lead to incorrect results being produced, depending on what order the tasks are executed. [2023-06-14T11:41:50.769Z] [2023-06-14T11:41:50.769Z] Possible solutions: [2023-06-14T11:41:50.769Z] 1. Declare task ':clients:processTestMessages' as an input of ':rat'. [2023-06-14T11:41:50.769Z] 2. Declare an explicit dependency on ':clients:processTestMessages' from ':rat' using Task#dependsOn. [2023-06-14T11:41:50.769Z] 3. Declare an explicit dependency on ':clients:processTestMessages' from ':rat' using Task#mustRunAfter. [2023-06-14T11:41:50.769Z] [2023-06-14T11:41:50.769Z] Please refer to https://docs.gradle.org/8.1.1/userguide/validation_problems.html#implicit_dependency for more details about this problem. ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task
dajac commented on PR #13854: URL: https://github.com/apache/kafka/pull/13854#issuecomment-1591397811 Similar to https://github.com/apache/kafka/pull/13316. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13705: MINOR:Refactor the metric names into constants in ReplicaManager
hudeqi commented on PR #13705: URL: https://github.com/apache/kafka/pull/13705#issuecomment-1591356625 > Looks good to me. One minor thing is to update the PR name (since we use the same for commit messages). As I understand, we are not really optimizing anything here. We are refactoring the metric names into constants. > > Can you please change the PR description to reflect the same? ok, I see, updated the pr name. @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits
mimaison commented on code in PR #13838: URL: https://github.com/apache/kafka/pull/13838#discussion_r1229669181 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -1183,17 +1187,31 @@ private void createTopics() { } } -/* - * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly +/** + * Commit offset 0 for all partitions of test-topic-1 for the specified consumer groups on primary and backup clusters. + * This is done to force the MirrorCheckpointConnector to start at a task which checkpoints this group. + * Must be called before {@link #waitUntilMirrorMakerIsRunning} to prevent that method from timing out. */ -protected void warmUpConsumer(Map consumerProps) { -try (Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { -dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); -dummyConsumer.commitSync(); -} -try (Consumer dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { -dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); -dummyConsumer.commitSync(); +protected void prepareConsumerGroup(Map consumerProps) { +prepareConsumerGroup(primary.kafka(), consumerProps, "test-topic-1"); +prepareConsumerGroup(backup.kafka(), consumerProps, "test-topic-1"); +} + +private void prepareConsumerGroup(EmbeddedKafkaCluster cluster, Map consumerProps, String topic) { +try (Admin client = cluster.createAdminClient()) { +Map topics = client.describeTopics(Collections.singleton(topic)) +.allTopicNames() +.get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS); +Map collect = topics.get(topic) +.partitions() +.stream() +.collect(Collectors.toMap( +tpi -> new TopicPartition(topic, tpi.partition()), +ignored -> new OffsetAndMetadata(0L))); +AlterConsumerGroupOffsetsResult alterResult = client.alterConsumerGroupOffsets((String) consumerProps.get("group.id"), collect); +alterResult.all().get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS); +} catch (ExecutionException | InterruptedException | TimeoutException e) { Review Comment: Is there a reason we're catching and rethrowing as RuntimeException here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks
C0urante commented on PR #13819: URL: https://github.com/apache/kafka/pull/13819#issuecomment-1591326215 @mimaison @viktorsomogyi sorry for the extra ping--this issue is actually a regression and is leading to increased failure rates on Jenkins; would it be possible to take a look sometime this week or the next? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
divijvaidya commented on PR #13782: URL: https://github.com/apache/kafka/pull/13782#issuecomment-1591281777 Hey @novosibman could you please respond to rest of the comments at https://github.com/apache/kafka/pull/13782#pullrequestreview-1461194326 and https://github.com/apache/kafka/pull/13782#discussion_r1216711941 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13700: KAFKA-14959: remove delayed queue and exempt sensors during ClientQuota and ClientRequestQuota managers shutdown
machi1990 commented on PR #13700: URL: https://github.com/apache/kafka/pull/13700#issuecomment-1591262415 @divijvaidya since you worked on https://github.com/apache/kafka/pull/13623, would you be open to give this PR a review? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13624: MINOR: remove unused ProcessorNode#time field and remove unused imports
machi1990 commented on PR #13624: URL: https://github.com/apache/kafka/pull/13624#issuecomment-1591235123 Thank you @divijvaidya for the review and merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13831: KAFKA-15053: Use case insensitive validator for security.protocol config
divijvaidya commented on PR #13831: URL: https://github.com/apache/kafka/pull/13831#issuecomment-1591232505 > Also, what's the common process to update the documentation? As part of this PR, please modify the documentation at: https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/docs/security.html#L69 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
hudeqi commented on code in PR #13719: URL: https://github.com/apache/kafka/pull/13719#discussion_r1229633097 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -680,12 +680,16 @@ abstract class AbstractFetcherThread(name: String, */ val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) val leaderStartOffset = offsetAndEpoch.offset - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + -s"leader's start offset $leaderStartOffset") val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset) // Only truncate log when current leader's log start offset is greater than follower's log end offset. - if (leaderStartOffset > replicaEndOffset) + if (leaderStartOffset > replicaEndOffset) { +warn(s"Truncate fully and start at leader's start offset $leaderStartOffset for partition $topicPartition " + + s"because the local replica logEndOffset $replicaEndOffset is smaller than leader's start offset.") Review Comment: > What do you think about the following wording to be more aligned with the other log message? > > ``` > Truncate fully and reset fetch offset for partition $topicPartition from $current to the current leader's start offset $new because the local replica's end offset is smaller than the current leader's start offsets. > ``` > In contrast, the overall meaning is similar, I don't feel a huge difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7802) Connection to Broker Disconnected Taking Down the Whole Cluster
[ https://issues.apache.org/jira/browse/KAFKA-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732540#comment-17732540 ] Lubos Hozzan commented on KAFKA-7802: - Hello. The problem is still persistent. We using version *3.4.1* in KRaft mode (3 instances in Kubernetes cluster). Warning looks like (in fact, it is avalanche of same or very similar records from each Kafka instance): {noformat} [2023-06-13 12:21:25,771] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={tsttopic-1=PartitionData(topicId=4UDBTCegTPy5mdCbL5fLyg, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[6], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread) {noformat} Please, focus to the {{metadata}}: *{{sessionId=INVALID, epoch=INITIAL}}* This problem begin in empty cluster (= PVCs for instances pods are empty) at first start. Affected are all instances. I attempted: - restart instances one-by-one = no change - stop all instances at once and start them again at once = no change - stop all instances and delete PVCs (Kafka cluster start as a empty) = problem sometimes disappeared Is the problem in stored data? When problem disappeared, restarting instances not have effect, cluster is running fine. I mean, if instances made their folders structure correctly, they are working without any problems. Next strange thing based on my observation are metrics, particular {{BrokerTopicMetrics BytesInPerSec}}: !BytesInput.png! As you can see, when problem begin, it looks like, that cluster have two leaders (correct leader and some fake) and both generating the metrics (before 14:20). When problem disappeared (after 14:20), the metrics are emitted only from one instance, which was in the time elected leader. Hope that helps get closer to solve the problem. Best regards. > Connection to Broker Disconnected Taking Down the Whole Cluster > --- > > Key: KAFKA-7802 > URL: https://issues.apache.org/jira/browse/KAFKA-7802 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Candice Wan >Priority: Critical > Attachments: BytesInput.png, thread_dump.log > > > We recently upgraded to 2.1.0. Since then, several times per day, we observe > some brokers were disconnected when other brokers were trying to fetch the > replicas. This issue took down the whole cluster, making all the producers > and consumers not able to publish or consume messages. It could be quickly > fixed by restarting the problematic broker. > Here is an example of what we're seeing in the broker which was trying to > send fetch request to the problematic one: > 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] INFO > o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, > fetcherId=0] Error sending fetch request (sessionId=937967566, epoch=1599941) > to node 3: java.io.IOException: Connection to 3 was disconnected before the > response was read. > 2019-01-09 08:05:10.445 [ReplicaFetcherThread-1-3] INFO > o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, > fetcherId=1] Error sending fetch request (sessionId=506217047, epoch=1375749) > to node 3: java.io.IOException: Connection to 3 was disconnected before the > response was read. > 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] WARN > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1, leaderId=3, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-11=(offset=421032847, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[178])}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=937967566, > epoch=1599941)) > java.io.IOException: Connection to 3 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:199) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at
[GitHub] [kafka] divijvaidya merged pull request #13624: MINOR: remove unused ProcessorNode#time field and remove unused imports
divijvaidya merged PR #13624: URL: https://github.com/apache/kafka/pull/13624 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12458) Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob Storage)
[ https://issues.apache.org/jira/browse/KAFKA-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732539#comment-17732539 ] Ivan Yurchenko commented on KAFKA-12458: FYI: I've been working on a {{RemoteStorageManager}} implementation that currently supports only AWS S3, but it's planned to add Azure in near future. [https://github.com/aiven/tiered-storage-for-apache-kafka] > Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob > Storage) > - > > Key: KAFKA-12458 > URL: https://issues.apache.org/jira/browse/KAFKA-12458 > Project: Kafka > Issue Type: Sub-task >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > > Task to cover integration support for Azure Storage > * Azure Blob Storage > * Azure Data Lake Store > Will split task up later into distinct tracks and components -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9565) Implementation of Tiered Storage SPI to integrate with S3
[ https://issues.apache.org/jira/browse/KAFKA-9565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732536#comment-17732536 ] Ivan Yurchenko commented on KAFKA-9565: --- AFAIU, concrete {{RemoteStorageManager}} implementations won't be hosted in the Apache Kafka repo. So this ticket should probably be closed as wont-fix. I've been working on a {{RemoteStorageManager}} implementation that supports AWS S3 and in future other cloud object storages (at least Azure and GCS): [https://github.com/aiven/tiered-storage-for-apache-kafka|https://github.com/aiven/tiered-storage-for-apache-kafka] > Implementation of Tiered Storage SPI to integrate with S3 > - > > Key: KAFKA-9565 > URL: https://issues.apache.org/jira/browse/KAFKA-9565 > Project: Kafka > Issue Type: Sub-task >Reporter: Alexandre Dupriez >Assignee: Ivan Yurchenko >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
dajac commented on code in PR #13719: URL: https://github.com/apache/kafka/pull/13719#discussion_r1229581401 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -680,12 +680,16 @@ abstract class AbstractFetcherThread(name: String, */ val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) val leaderStartOffset = offsetAndEpoch.offset - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + -s"leader's start offset $leaderStartOffset") val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset) // Only truncate log when current leader's log start offset is greater than follower's log end offset. - if (leaderStartOffset > replicaEndOffset) + if (leaderStartOffset > replicaEndOffset) { +warn(s"Truncate fully and start at leader's start offset $leaderStartOffset for partition $topicPartition " + + s"because the local replica logEndOffset $replicaEndOffset is smaller than leader's start offset.") Review Comment: What do you think about the following wording to be more aligned with the other log message? ``` Truncate fully and reset fetch offset for partition $topicPartition from $current to the current leader's start offset $new because the local replica's end offset is smaller than the current leader's start offsets. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-7802) Connection to Broker Disconnected Taking Down the Whole Cluster
[ https://issues.apache.org/jira/browse/KAFKA-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lubos Hozzan updated KAFKA-7802: Attachment: BytesInput.png > Connection to Broker Disconnected Taking Down the Whole Cluster > --- > > Key: KAFKA-7802 > URL: https://issues.apache.org/jira/browse/KAFKA-7802 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Candice Wan >Priority: Critical > Attachments: BytesInput.png, thread_dump.log > > > We recently upgraded to 2.1.0. Since then, several times per day, we observe > some brokers were disconnected when other brokers were trying to fetch the > replicas. This issue took down the whole cluster, making all the producers > and consumers not able to publish or consume messages. It could be quickly > fixed by restarting the problematic broker. > Here is an example of what we're seeing in the broker which was trying to > send fetch request to the problematic one: > 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] INFO > o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, > fetcherId=0] Error sending fetch request (sessionId=937967566, epoch=1599941) > to node 3: java.io.IOException: Connection to 3 was disconnected before the > response was read. > 2019-01-09 08:05:10.445 [ReplicaFetcherThread-1-3] INFO > o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, > fetcherId=1] Error sending fetch request (sessionId=506217047, epoch=1375749) > to node 3: java.io.IOException: Connection to 3 was disconnected before the > response was read. > 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] WARN > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1, leaderId=3, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-11=(offset=421032847, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[178])}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=937967566, > epoch=1599941)) > java.io.IOException: Connection to 3 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:199) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > > > Below is the suspicious log of the problematic broker when the issue > happened: > 2019-01-09 08:04:50.177 [executor-Heartbeat] INFO > k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member > consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 in group talon-instance1 has > failed, rem > oving it from the group > 2019-01-09 08:04:50.177 [executor-Heartbeat] INFO > k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to > rebalance group talon-instance1 in state PreparingRebalance with old > generation 27 > 0 (__consumer_offsets-47) (reason: removing member > consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 on heartbeat expiration) > 2019-01-09 08:04:50.297 [executor-Heartbeat] INFO > k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member > consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b in group > Notifications.ASIA1546980352799 has failed, removing it from the group > 2019-01-09 08:04:50.297 [executor-Heartbeat] INFO > k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to > rebalance group Notifications.ASIA1546980352799 in state PreparingRebalance > with old generation 1 (__consumer_offsets-44) (reason: removing member > consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b on heartbeat expiration) > 2019-01-09 08:04:50.297 [executor-Heartbeat] INFO > k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Group > Notifications.ASIA1546980352799 with generation 2 is now empty > (__consumer_offsets-44) > 2019-01-09 08:04:50.388 [executor-Heartbeat] INFO > k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member >
[GitHub] [kafka] dajac commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
dajac commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1229600983 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; + +import java.util.Collections; +import java.util.List; + +/** + * The group coordinator configurations. + */ +public class GroupCoordinatorConfig { +public static class Builder { +private int numThreads = 1; Review Comment: I have played a bit around with this idea. It create a pretty big diff so I have decided to tackle this separately from this one. I filed https://issues.apache.org/jira/browse/KAFKA-15089 for this purpose. For this patch, I have reduced `GroupCoordinatorConfig` to a simple POJO for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15089) Consolidate all the group coordinator configs
David Jacot created KAFKA-15089: --- Summary: Consolidate all the group coordinator configs Key: KAFKA-15089 URL: https://issues.apache.org/jira/browse/KAFKA-15089 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot The group coordinator configurations are defined in KafkaConfig at the moment. As KafkaConfig is defined in the core module, we can't pass it to the new java modules to pass the configurations along. A suggestion here is to centralize all the configurations of a module in the module itself similarly to what we have do for RemoteLogManagerConfig and RaftConfig. We also need a mechanism to add all the properties defined in the module to the KafkaConfig's ConfigDef. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
hudeqi commented on code in PR #13719: URL: https://github.com/apache/kafka/pull/13719#discussion_r1229575083 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -680,12 +680,15 @@ abstract class AbstractFetcherThread(name: String, */ val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) val leaderStartOffset = offsetAndEpoch.offset - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + -s"leader's start offset $leaderStartOffset") val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset) + info(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to " + Review Comment: Updated according to your comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
hudeqi commented on code in PR #13719: URL: https://github.com/apache/kafka/pull/13719#discussion_r1229573502 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -680,12 +680,15 @@ abstract class AbstractFetcherThread(name: String, */ val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) val leaderStartOffset = offsetAndEpoch.offset - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + -s"leader's start offset $leaderStartOffset") val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset) + info(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to " + Review Comment: > I see. In this case, would it make sense to move this log to the else branch of the if at L687? It is a bit weird to log this and then potentially the warning. > > If we do this, we could also rewrite it as follow: > > ``` > Reset fetch offset for partition $topicPartition from $replicaEndOffset to the current local replica's end offset $offsetToFetch > ``` > > Would this make sense? Thanks, I think it makes more sense this way. @dajac -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
dajac commented on code in PR #13719: URL: https://github.com/apache/kafka/pull/13719#discussion_r1229555487 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -680,12 +680,15 @@ abstract class AbstractFetcherThread(name: String, */ val offsetAndEpoch = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) val leaderStartOffset = offsetAndEpoch.offset - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + -s"leader's start offset $leaderStartOffset") val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset) + info(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to " + Review Comment: I see. In this case, would it make sense to move this log to the else branch of the if at L687? It is a bit weird to log this and then potentially the warning. If we do this, we could also rewrite it as follow: ``` Reset fetch offset for partition $topicPartition from $replicaEndOffset to the current local replica's end offset $offsetToFetch ``` Would this make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13839: MINOR:Fill missing parameter annotations for some LogCleaner methods
hudeqi commented on PR #13839: URL: https://github.com/apache/kafka/pull/13839#issuecomment-159368 Hello, could you help to review this PR? @jlprat -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13705: MINOR:Optimize the use of metrics in ReplicaManager and remove checks
hudeqi commented on PR #13705: URL: https://github.com/apache/kafka/pull/13705#issuecomment-1591092269 Hi, @divijvaidya, it has been updated according to your comment, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs
showuon commented on code in PR #13650: URL: https://github.com/apache/kafka/pull/13650#discussion_r1229514718 ## connect/mirror/README.md: ## @@ -1,297 +0,0 @@ - -# MirrorMaker 2.0 - -MM2 leverages the Connect framework to replicate topics between Kafka -clusters. MM2 includes several new features, including: - - - both topics and consumer groups are replicated - - topic configuration and ACLs are replicated - - cross-cluster offsets are synchronized - - partitioning is preserved - -## Replication flows - -MM2 replicates topics and consumer groups from upstream source clusters -to downstream target clusters. These directional flows are notated -`A->B`. - -It's possible to create complex replication topologies based on these -`source->target` flows, including: - - - *fan-out*, e.g. `K->A, K->B, K->C` - - *aggregation*, e.g. `A->K, B->K, C->K` - - *active/active*, e.g. `A->B, B->A` - -Each replication flow can be configured independently, e.g. to replicate -specific topics or groups: - -A->B.topics = topic-1, topic-2 -A->B.groups = group-1, group-2 - -By default, all topics and consumer groups are replicated (except -excluded ones), across all enabled replication flows. Each -replication flow must be explicitly enabled to begin replication: - -A->B.enabled = true -B->A.enabled = true - -## Starting an MM2 process - -You can run any number of MM2 processes as needed. Any MM2 processes -which are configured to replicate the same Kafka clusters will find each -other, share configuration, load balance, etc. - -To start an MM2 process, first specify Kafka cluster information in a -configuration file as follows: - -# mm2.properties -clusters = us-west, us-east -us-west.bootstrap.servers = host1:9092 -us-east.bootstrap.servers = host2:9092 - -You can list any number of clusters this way. - -Optionally, you can override default MirrorMaker properties: - -topics = .* -groups = group1, group2 -emit.checkpoints.interval.seconds = 10 - -These will apply to all replication flows. You can also override default -properties for specific clusters or replication flows: - -# configure a specific cluster -us-west.offset.storage.topic = mm2-offsets - -# configure a specific source->target replication flow -us-west->us-east.emit.heartbeats = false - -Next, enable individual replication flows as follows: - -us-west->us-east.enabled = true # disabled by default - -Finally, launch one or more MirrorMaker processes with the `connect-mirror-maker.sh` -script: - -$ ./bin/connect-mirror-maker.sh mm2.properties - -## Multicluster environments - -MM2 supports replication between multiple Kafka clusters, whether in the -same data center or across multiple data centers. A single MM2 cluster -can span multiple data centers, but it is recommended to keep MM2's producers -as close as possible to their target clusters. To do so, specify a subset -of clusters for each MM2 node as follows: - -# in west DC: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2 - -This signals to the node that the given clusters are nearby, and prevents the -node from sending records or configuration to clusters in other data centers. - -### Example - -Say there are three data centers (west, east, north) with two Kafka -clusters in each data center (west-1, west-2 etc). We can configure MM2 -for active/active replication within each data center, as well as cross data -center replication (XDCR) as follows: - -# mm2.properties -clusters: west-1, west-2, east-1, east-2, north-1, north-2 - -west-1.bootstrap.servers = ... ----%<--- - -# active/active in west -west-1->west-2.enabled = true -west-2->west-1.enabled = true - -# active/active in east -east-1->east-2.enabled = true -east-2->east-1.enabled = true - -# active/active in north -north-1->north-2.enabled = true -north-2->north-1.enabled = true - -# XDCR via west-1, east-1, north-1 -west-1->east-1.enabled = true -west-1->north-1.enabled = true -east-1->west-1.enabled = true -east-1->north-1.enabled = true -north-1->west-1.enabled = true -north-1->east-1.enabled = true - -Then, launch MM2 in each data center as follows: - -# in west: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2 - -# in east: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters east-1 east-2 - -# in north: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters north-1 north-2 - -With this configuration, records produced to any cluster will be replicated -within the data center, as well as across to other data centers. By providing -the `--clusters` parameter, we ensure that each node only produces records to -nearby clusters. - -N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between
[jira] [Assigned] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM
[ https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-15083: - Assignee: Satish Duggana > Passing "remote.log.metadata.*" configs into RLMM > - > > Key: KAFKA-15083 > URL: https://issues.apache.org/jira/browse/KAFKA-15083 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Satish Duggana >Priority: Major > > Based on the > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: > |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM > implementation creates producer and consumer instances. Common client > propoerties can be configured with `remote.log.metadata.common.client.` > prefix. User can also pass properties specific to > {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` > and `remote.log.metadata.consumer.` prefixes. These will override properties > with `remote.log.metadata.common.client.` prefix.{color} > {color:#00}Any other properties should be prefixed with > "remote.log.metadata." and these will be passed to > RemoteLogMetadataManager#configure(Map props).{color} > {color:#00}For ex: Security configuration to connect to the local broker > for the listener name configured are passed with props.{color}| > > This is missed from current implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
showuon commented on PR #13828: URL: https://github.com/apache/kafka/pull/13828#issuecomment-1591076070 > > I think we don't have this implemented. We should pass remote.log.metadata.* into RLMM based on KIP-405. Created [KAFKA-15083](https://issues.apache.org/jira/browse/KAFKA-15083) for this issue. > > @showuon This is no more valid, KIP needs to be updated with the prefix based configs for RSM and RLMM. Will update the KIP with those details. Good to know, thanks Satish. I've assigned KAFKA-15083 to you. You can close it once you've updated the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes
vamossagar12 commented on PR #13853: URL: https://github.com/apache/kafka/pull/13853#issuecomment-1591072071 @daehokimm ,nice catch, thanks for the PR. LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] daehokimm opened a new pull request, #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes
daehokimm opened a new pull request, #13853: URL: https://github.com/apache/kafka/pull/13853 Currently, in the AbstractHerder class, the behavior of state changes for Connectors and Tasks is handled by implementing Listener using the ConnectorStatus and TaskStatus classes, which inherit from AbstractStatus. However, the code implementing the state change behavior in ConnectorStatus refers to and uses an inappropriate State enum class. Both ConnectorStatus and TaskStatus inherit and implement AbstractStatus, and as a result, they share the State class. However, there is a need to make modifications for clear referencing. We need to fix the implementation in ConnectorStatus to use the appropriate reference for state changes. This bug ticket will address the task of modifying the code to use the correct reference for Connector state changes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15088) Fixing Incorrect Reference Usage in Connector State Changes
Daeho Kim created KAFKA-15088: - Summary: Fixing Incorrect Reference Usage in Connector State Changes Key: KAFKA-15088 URL: https://issues.apache.org/jira/browse/KAFKA-15088 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Daeho Kim Currently, in the AbstractHerder class, the behavior of state changes for Connectors and Tasks is handled by implementing Listener using the ConnectorStatus and TaskStatus classes, which inherit from AbstractStatus. However, the code implementing the state change behavior in ConnectorStatus refers to and uses an inappropriate State enum class. Both ConnectorStatus and TaskStatus inherit and implement AbstractStatus, and as a result, they share the State class. However, there is a need to make modifications for clear referencing. We need to fix the implementation in ConnectorStatus to use the appropriate reference for state changes. This bug ticket will address the task of modifying the code to use the correct reference for Connector state changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] KarboniteKream commented on pull request #13762: MINOR: Do not print an empty line when no topics exist
KarboniteKream commented on PR #13762: URL: https://github.com/apache/kafka/pull/13762#issuecomment-1591027643 Sure, it's no problem for me to wait. I just wanted to confirm what you had in mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13762: MINOR: Do not print an empty line when no topics exist
vamossagar12 commented on PR #13762: URL: https://github.com/apache/kafka/pull/13762#issuecomment-1591024476 > Sure, I understand the concerns. > > > you might want to align your PR with that PR? > > Do you mean waiting for that PR to get merged, then rebase my changes? Or add my change to that PR? You could wait for the PR to get merged if that's ok to you. It's your call TBH :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
hudeqi commented on PR #13719: URL: https://github.com/apache/kafka/pull/13719#issuecomment-1591024699 Hi, is there any suggestion and must do for this PR now? @dajac @satishd @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229446218 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public abstract class TimerTask implements Runnable { +private volatile TimerTaskEntry timerTaskEntry; +// timestamp in millisecond +public final long delayMs; + +public TimerTask(long delayMs) { +this.delayMs = delayMs; +} + +public void cancel() { +synchronized (this) { +if (timerTaskEntry != null) timerTaskEntry.remove(); +timerTaskEntry = null; +} +} + +void setTimerTaskEntry(TimerTaskEntry entry) { +synchronized (this) { +// if this timerTask is already held by an existing timer task entry, +// we will remove such an entry first. +if (timerTaskEntry != null && !timerTaskEntry.equals(entry)) { Review Comment: Fixed in last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics
hudeqi opened a new pull request, #13852: URL: https://github.com/apache/kafka/pull/13852 ### Activation As the config 'segment.bytes' for topics related MM2(such as offset.storage.topic, config.storage.topic,status.storage.topic), if following the default configuration of the broker or set it larger, then when the MM cluster runs many and complicated tasks, especially the log volume of the topic 'offset.storage.topic' is very large, it will affect the restart speed of the MM workers. After investigation, the reason is that a consumer needs to be started to read the data of ‘offset.storage.topic’ at startup. Although this topic is set to compact, if the 'segment size' is set to a large value, such as the default value of 1G, then this topic may have tens of gigabytes of data that cannot be compacted and has to be read from the earliest (because the active segment cannot be cleaned), which will consume a lot of time (in our online environment, we found that this topic stores 13G of data, it took nearly half an hour for all the data to be consumed), which caused the worker to be unable to start and execute tasks for a long time. ### Solution The number of consumer threads can also be adjusted, but I think it may be easier to set a upper limit for the 'segment size', for example, refer to the default value of __consumer_offsets: 100MB -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] daehokimm closed pull request #9259: KAFKA-10466: Allow regex for MaskField SMT to replacement
daehokimm closed pull request #9259: KAFKA-10466: Allow regex for MaskField SMT to replacement URL: https://github.com/apache/kafka/pull/9259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
cadonna commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1229370444 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class RocksDBTimeOrderedKeyValueBufferTest { +public RocksDBTimeOrderedKeyValueBuffer buffer; +@Mock +public SerdeGetter serdeGetter; +public InternalProcessorContext context; +public StreamsMetricsImpl streamsMetrics; +@Mock +public Sensor sensor; +public long offset; + +@Before +public void setUp() { +when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); +when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); +final Metrics metrics = new Metrics(); +offset = 0; +streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); +context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); +} + +public void createJoin(final Duration grace) { +final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing", 100).get(); +buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, "testing"); +buffer.setSerdesIfNull(serdeGetter); +store.init((StateStoreContext) context, store); +buffer.init((StateStoreContext) context, store); +} + +private void pipeRecord(final String key, final String value, final long time) { +final Record record = new Record<>(key, value, time); +context.setRecordContext(new ProcessorRecordContext(time, offset++, 0, "testing", new RecordHeaders())); +buffer.put(time, record, context.recordContext()); +} + +@Test +public void shouldAddAndEvictRecord() { +createJoin(Duration.ZERO); +final AtomicInteger count = new AtomicInteger(0); +pipeRecord("1", "0", 0L); +buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement()); +assertThat(count.get(), equalTo(1)); +} Review Comment: Could you please add verifications that `numRecords()`, `bufferSize()`, and `minTimestamp()` are updated correctly? ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the
[jira] [Created] (KAFKA-15087) Move InterBrokerSendThread to server-commons module
Dimitar Dimitrov created KAFKA-15087: Summary: Move InterBrokerSendThread to server-commons module Key: KAFKA-15087 URL: https://issues.apache.org/jira/browse/KAFKA-15087 Project: Kafka Issue Type: Task Reporter: Dimitar Dimitrov Assignee: Dimitar Dimitrov Similar to the move of {{ShutdownableThread}} done with KAFKA-14706. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1229411879 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +284,61 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +Future secondaryWriteFuture = secondaryStore.set(values, (secondaryWriteError, ignored) -> { +try (LoggingContext context = loggingContext()) { +if (secondaryWriteError != null) { +log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); +secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); +} else { +log.debug("Successfully flushed tombstone offsets to secondary backing store"); +} +} +}); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when +// tombstone records fail to be written to the secondary store. Note that while commitTransaction +// already waits for all records to be sent and ack'ed, in this case we do need to add an explicit +// blocking call. In case of ALOS, we wait for the same duration as `offset.commit.timeout.ms` +// and throw that exception which would allow the offset commit to fail. +if (isEOSEnabled) { +secondaryWriteFuture.get(); Review Comment: While in the comments I have mentioned the reasoning of using an explicit `get()` call here, one thing to note is that I couldn't test the scenario where the secondary store write fails and the doFlush callback is able to see the error. This is because the `MockProducer#send` method throws any exception that you pass on to it w/o being able to create/execute the callback for it. This makes the `set` method itself to throw the error which is not what's going to happen from what I have understood. I could have ideally extended the send() in `MockProducer` to handle the case properly, but I didn't want to make any assumptions around it. If the reasoning stated above seems fine, I think it should be ok to do what I have done here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229407264 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public class TimerTaskEntry { Review Comment: As explained in the other comment, this is not required any more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1229405980 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); + +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +secondaryStore.set(values, (secondaryWriteError, ignored) -> { +try (LoggingContext context = loggingContext()) { +if (secondaryWriteError != null) { +log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); +secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); +} else { +log.debug("Successfully flushed tombstone offsets to secondary backing store"); +} +} +}); +} + return primaryStore.set(values, (primaryWriteError, ignored) -> { -if (secondaryStore != null) { +// Secondary store writes have already happened for tombstone records Review Comment: Plz check my comment here: https://github.com/apache/kafka/pull/13801#discussion_r1229405593 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1229405593 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -302,7 +326,12 @@ public Future set(Map values, Callback callb } } try (LoggingContext context = loggingContext()) { -callback.onCompletion(primaryWriteError, ignored); +Throwable secondaryWriteError = secondaryStoreTombstoneWriteError.get(); +if (secondaryStore != null && containsTombstones && secondaryWriteError != null) { Review Comment: That's true and I have tweaked the logic to wait for secondary store write in case of tombstone records before proceeding with primary store writes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1229404145 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } +@Test Review Comment: I kept them here because eventually the error that should manifest in the callbacks should be via the `doFlush` method(which is what the actual `commitOffsets` method would be seeing. Let me know if that sounds ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1229402835 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); Review Comment: @ashwinpankaj , I am thinking it should be ok to scan the map because it's not going to be huge. Moreover, the flag `containsTombstones` is used when writing to primary store as well to check if secondary store writes have already happened. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1229399358 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } Review Comment: Sure, will do once we are closer to merging this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org