[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer
kowshik edited a comment on pull request #10280: URL: https://github.com/apache/kafka/pull/10280#issuecomment-808514958 @junrao Just a heads up on the following. I'm working on the changes for the following in separate PRs, these are related with refactoring the recovery logic (KAFKA-12553): * KAFKA-12552 (https://github.com/apache/kafka/pull/10401) to extract segments map **[MERGED]** * KAFKA-12571: (https://github.com/apache/kafka/pull/10426) to eliminate LeaderEpochFileCache constructor dependency on logEndOffset **[MERGED]** * KAFKA-12575: (https://github.com/apache/kafka/pull/10430) to eliminate Log.isLogDirOffline boolean attribute **[MERGED]** * KAFKA-12553: (https://github.com/apache/kafka/pull/10478) Refactor recovery logic to introduce LogLoader **[MERGED]** It seems better if we merge those into trunk ahead of the current PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10591: Fix minor bugs in the existing documentation
showuon commented on a change in pull request #10591: URL: https://github.com/apache/kafka/pull/10591#discussion_r620956779 ## File path: docs/ops.html ## @@ -78,7 +78,7 @@ auto.leader.rebalance.enable=true You can also set this to false, but you will then need to manually restore leadership to the restored replicas by running the command: -> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port +> bin/kafka-leader-election.sh --bootstrap-server broker_host:port Review comment: The paragraph is talking about `preferred replicas` update, why do we need to change to use `kafka-leader-election.sh`? ## File path: docs/security.html ## @@ -319,7 +319,7 @@ SSL key and certificates in PEM format We need to configure the following property in server.properties, which must have one or more comma-separated values: listeners -If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary. +If SSL is enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary. Review comment: I think the section is saying: > If SSL is not enabled, we need to set both PLAINTEXT and SSL ports, but if SSL is enabled, we only need SSL ports. What do you think? ## File path: docs/upgrade.html ## @@ -420,11 +420,11 @@ Upgrading from 0.8.x, 0.9.x, 0.1 if there are no snapshot files in 3.4 data directory. For more details about the workaround please refer to https://cwiki.apache.org/confluence/display/ZOOKEEPER/Upgrade+FAQ";>ZooKeeper Upgrade FAQ. -An embedded Jetty based http://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#sc_adminserver";>AdminServer added in ZooKeeper 3.5. +An embedded Jetty based http://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#sc_adminserver";>AdminServer added in ZooKeeper 3.5. Review comment: Actually, we're using zookeeper 3.5.9. Please help update it here and below. Thanks. ## File path: docs/upgrade.html ## @@ -337,8 +337,8 @@ < https://github.com/apache/kafka/tree/2.5/examples";>examples folder. Check out https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics";>KIP-447 for the full details. -Added a new public api KafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried. -It provides information about the partition number where the key resides in addition to hosts containing the active and standby partitions for the key. +Added a new public api KafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried. Review comment: nice catch! ## File path: docs/toc.html ## @@ -79,11 +79,15 @@ Modifying topics Graceful shutdown Balancing leadership -Checking consumer position +Balancing Replicas Across Racks Mirroring data between clusters +Checking consumer position +Managing Consumer Groups Review comment: Nice improvement to add missing TOC! ## File path: docs/upgrade.html ## @@ -136,8 +136,8 @@ Notable changes in 2 The 2.7.0 release includes the core Raft implementation specified in https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum";>KIP-595. There is a separate "raft" module containing most of the logic. Until integration with the -controller is complete, there is a standalone server that users can use for testing the p -erformance of the Raft implementation. See the README.md in the raft module for details Review comment: Nice catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r620996380 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { +private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); +private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + +private static final Map REMOTE_LOG_STORAGE_CLASS_TO_API_KEY = createRemoteLogStorageClassToApiKeyMap(); +private static final Map KEY_TO_TRANSFORM = createRemoteLogMetadataTransforms(); + +private static final BytesApiMessageSerde BYTES_API_MESSAGE_SERDE = new BytesApiMessageSerde() { +@Override +public ApiMessage apiMessageFor(short apiKey) { Review comment: Changed remote log metadata record type as `metadata` and enabled metadata record generation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions
dajac commented on a change in pull request #10599: URL: https://github.com/apache/kafka/pull/10599#discussion_r620974574 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Evolving +public class AbortTransactionResult { Review comment: Should we add some javadoc to the classes/methods published in our public API? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionSpec.java ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Objects; + +@InterfaceStability.Evolving +public class AbortTransactionSpec { +private final TopicPartition topicPartition; +private final long producerId; +private final short producerEpoch; +private final int coordinatorEpoch; Review comment: In the KIP, you also mentioned `transactionStartOffset`. Is it not required anymore or do you plan to add it later on? ## File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.AbortTransactionSpec; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.message.WriteTxnMarkersRequestData; +import org.apache.kafka.common.message.WriteTxnMarkersResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.WriteTxnMarkersRequest; +import org.apache.kafka.common.requests.WriteTxnMarkersResponse; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.Test; + +import static java.util.Collections.emptyList; +import static j
[GitHub] [kafka] daehokimm commented on pull request #9259: KAFKA-10466: Allow regex for MaskField SMT to replacement
daehokimm commented on pull request #9259: URL: https://github.com/apache/kafka/pull/9259#issuecomment-827442242 please review this :) 🙏 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r621028610 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { Review comment: I prefer avoiding singletons if possible. I have changed this class to be open and extensible. :) This can be useful in the future if we need to extend this for tests or other purposes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries
vitojeng commented on pull request #10597: URL: https://github.com/apache/kafka/pull/10597#issuecomment-827475511 @ableegoldman Please take a look. :) If all work in this PR is completed, I will update the KIP and discussion thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10592: MINOR: Remove redudant test files and close LogSegment after test
chia7712 commented on pull request #10592: URL: https://github.com/apache/kafka/pull/10592#issuecomment-827477372 @dengziming nice find and thanks for your patch. Those files are noisy to me :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10446: KAFKA-12661 ConfigEntry#equal does not compare other fields when value is NOT null
chia7712 commented on pull request #10446: URL: https://github.com/apache/kafka/pull/10446#issuecomment-827478300 ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync() ``` unrelated error. @ijuma @dajac please take a look if you have free cycles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Nathan22177 commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
Nathan22177 commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r621061574 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: Oh, I see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Nathan22177 commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
Nathan22177 commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r621063125 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java ## @@ -472,11 +472,26 @@ public void shouldThrowNullPointerOnRemoveIfKeyIsNull() { assertThrows(NullPointerException.class, () -> store.remove(null)); } +@Test +public void shouldThrowNullPointerOnPutIfWrappedKeyIsNull() { +assertThrows(NullPointerException.class, () -> store.put(new Windowed<>(null, new SessionWindow(0, 0)), "a")); Review comment: Yeah, sorry, I forget to do that sometimes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config
Bui Thanh MInh created KAFKA-12719: -- Summary: Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config Key: KAFKA-12719 URL: https://issues.apache.org/jira/browse/KAFKA-12719 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.7.0 Reporter: Bui Thanh MInh Config: ``` clusters = DC1, DC2 NPS.bootstrap.servers = NTL.bootstrap.servers = # Source and target clusters configurations. config.storage.replication.factor = 3 offset.storage.replication.factor = 3 status.storage.replication.factor = 3 DC1->DC2.enabled = true DC2->DC1.enabled = true # Mirror maker configurations. offset-syncs.topic.replication.factor = 3 heartbeats.topic.replication.factor = 3 checkpoints.topic.replication.factor = 3 topics = .* groups = .* ``` In my test case, I turn off whole DC1, and client will switch to DC2, after that I bring cluster in DC1 back, restart MM2 with no error and realize that no topics was replicated from DC2->DC1. I don't know why and how to check in this case. What's wrong in my configuration? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config
[ https://issues.apache.org/jira/browse/KAFKA-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bui Thanh MInh updated KAFKA-12719: --- Description: Config: _clusters = DC1, DC2_ _NPS.bootstrap.servers = _ _NTL.bootstrap.servers = _ _# Source and target clusters configurations._ _config.storage.replication.factor = 3_ _offset.storage.replication.factor = 3_ _status.storage.replication.factor = 3_ _DC1->DC2.enabled = true_ _DC2->DC1.enabled = true_ _# Mirror maker configurations._ _offset-syncs.topic.replication.factor = 3_ _heartbeats.topic.replication.factor = 3_ _checkpoints.topic.replication.factor = 3_ _topics = .*_ _groups = .*_ In my test case, I turn off whole DC1, and client will switch to DC2, after that I bring cluster in DC1 back, restart MM2 with no error and realize that no topics was replicated from DC2->DC1. I don't know why and how to check in this case. What's wrong in my configuration? was: Config: ``` clusters = DC1, DC2 NPS.bootstrap.servers = NTL.bootstrap.servers = # Source and target clusters configurations. config.storage.replication.factor = 3 offset.storage.replication.factor = 3 status.storage.replication.factor = 3 DC1->DC2.enabled = true DC2->DC1.enabled = true # Mirror maker configurations. offset-syncs.topic.replication.factor = 3 heartbeats.topic.replication.factor = 3 checkpoints.topic.replication.factor = 3 topics = .* groups = .* ``` In my test case, I turn off whole DC1, and client will switch to DC2, after that I bring cluster in DC1 back, restart MM2 with no error and realize that no topics was replicated from DC2->DC1. I don't know why and how to check in this case. What's wrong in my configuration? > Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config > > > Key: KAFKA-12719 > URL: https://issues.apache.org/jira/browse/KAFKA-12719 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bui Thanh MInh >Priority: Major > > Config: > _clusters = DC1, DC2_ > _NPS.bootstrap.servers = _ > _NTL.bootstrap.servers = _ > _# Source and target clusters configurations._ > _config.storage.replication.factor = 3_ > _offset.storage.replication.factor = 3_ > _status.storage.replication.factor = 3_ > _DC1->DC2.enabled = true_ > _DC2->DC1.enabled = true_ > _# Mirror maker configurations._ > _offset-syncs.topic.replication.factor = 3_ > _heartbeats.topic.replication.factor = 3_ > _checkpoints.topic.replication.factor = 3_ > _topics = .*_ > _groups = .*_ > > In my test case, I turn off whole DC1, and client will switch to DC2, after > that I bring cluster in DC1 back, restart MM2 with no error and realize that > no topics was replicated from DC2->DC1. I don't know why and how to check in > this case. What's wrong in my configuration? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config
[ https://issues.apache.org/jira/browse/KAFKA-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bui Thanh MInh updated KAFKA-12719: --- Description: Config: ``` clusters = DC1, DC2 NPS.bootstrap.servers = NTL.bootstrap.servers = # Source and target clusters configurations. config.storage.replication.factor = 3 offset.storage.replication.factor = 3 status.storage.replication.factor = 3 DC1->DC2.enabled = true DC2->DC1.enabled = true # Mirror maker configurations. offset-syncs.topic.replication.factor = 3 heartbeats.topic.replication.factor = 3 checkpoints.topic.replication.factor = 3 topics = .* groups = .* ``` In my test case, I turn off whole DC1, and client will switch to DC2, after that I bring cluster in DC1 back, restart MM2 with no error and realize that no topics was replicated from DC2->DC1. I don't know why and how to check in this case. What's wrong in my configuration? was: Config: ``` clusters = DC1, DC2 NPS.bootstrap.servers = NTL.bootstrap.servers = # Source and target clusters configurations. config.storage.replication.factor = 3 offset.storage.replication.factor = 3 status.storage.replication.factor = 3 DC1->DC2.enabled = true DC2->DC1.enabled = true # Mirror maker configurations. offset-syncs.topic.replication.factor = 3 heartbeats.topic.replication.factor = 3 checkpoints.topic.replication.factor = 3 topics = .* groups = .* ``` In my test case, I turn off whole DC1, and client will switch to DC2, after that I bring cluster in DC1 back, restart MM2 with no error and realize that no topics was replicated from DC2->DC1. I don't know why and how to check in this case. What's wrong in my configuration? > Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config > > > Key: KAFKA-12719 > URL: https://issues.apache.org/jira/browse/KAFKA-12719 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bui Thanh MInh >Priority: Major > > Config: > ``` > clusters = DC1, DC2 > NPS.bootstrap.servers = > NTL.bootstrap.servers = > # Source and target clusters configurations. > config.storage.replication.factor = 3 > offset.storage.replication.factor = 3 > status.storage.replication.factor = 3 > DC1->DC2.enabled = true > DC2->DC1.enabled = true > # Mirror maker configurations. > offset-syncs.topic.replication.factor = 3 > heartbeats.topic.replication.factor = 3 > checkpoints.topic.replication.factor = 3 > topics = .* > groups = .* > > ``` > In my test case, I turn off whole DC1, and client will switch to DC2, after > that I bring cluster in DC1 back, restart MM2 with no error and realize that > no topics was replicated from DC2->DC1. I don't know why and how to check in > this case. What's wrong in my configuration? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r620996380 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { +private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); +private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + +private static final Map REMOTE_LOG_STORAGE_CLASS_TO_API_KEY = createRemoteLogStorageClassToApiKeyMap(); +private static final Map KEY_TO_TRANSFORM = createRemoteLogMetadataTransforms(); + +private static final BytesApiMessageSerde BYTES_API_MESSAGE_SERDE = new BytesApiMessageSerde() { +@Override +public ApiMessage apiMessageFor(short apiKey) { Review comment: Good point. Changed remote log metadata record type as `metadata` and enabled metadata record generation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10598: MINOR: rename wrong topic id variable name and description
dengziming commented on pull request #10598: URL: https://github.com/apache/kafka/pull/10598#issuecomment-827554412 @showuon Thank you, I also think it's not reasonable to fix it here since there are also some other occurrence, so I revert the JMap change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable
chia7712 commented on a change in pull request #10547: URL: https://github.com/apache/kafka/pull/10547#discussion_r621167591 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java ## @@ -337,18 +337,28 @@ public void createTopic(String topic) { * @param topic The name of the topic. */ public void createTopic(String topic, int partitions) { -createTopic(topic, partitions, 1, new HashMap<>()); +createTopic(topic, partitions, 1, new HashMap<>(), new Properties()); Review comment: How about replace this by `createTopic(topic, partitions, 1, Collections.emptyMap());`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable
showuon commented on a change in pull request #10547: URL: https://github.com/apache/kafka/pull/10547#discussion_r621231344 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java ## @@ -337,18 +337,28 @@ public void createTopic(String topic) { * @param topic The name of the topic. */ public void createTopic(String topic, int partitions) { -createTopic(topic, partitions, 1, new HashMap<>()); +createTopic(topic, partitions, 1, new HashMap<>(), new Properties()); Review comment: Good to me. I'll work on it tomorrow! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Mia-jeong opened a new pull request #10600: [WIP]MINOR: add abstract keywords to util classes
Mia-jeong opened a new pull request #10600: URL: https://github.com/apache/kafka/pull/10600 I think that Util classes are intended for using the static method without instantiating the class. So in order to be clear, I add abstract keywords to some Util classes which have no private constructor. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config
[ https://issues.apache.org/jira/browse/KAFKA-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bui Thanh MInh resolved KAFKA-12719. Resolution: Auto Closed Need to restart all mm2 instances > Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config > > > Key: KAFKA-12719 > URL: https://issues.apache.org/jira/browse/KAFKA-12719 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bui Thanh MInh >Priority: Major > > Config: > _clusters = DC1, DC2_ > _NPS.bootstrap.servers = _ > _NTL.bootstrap.servers = _ > _# Source and target clusters configurations._ > _config.storage.replication.factor = 3_ > _offset.storage.replication.factor = 3_ > _status.storage.replication.factor = 3_ > _DC1->DC2.enabled = true_ > _DC2->DC1.enabled = true_ > _# Mirror maker configurations._ > _offset-syncs.topic.replication.factor = 3_ > _heartbeats.topic.replication.factor = 3_ > _checkpoints.topic.replication.factor = 3_ > _topics = .*_ > _groups = .*_ > > In my test case, I turn off whole DC1, and client will switch to DC2, after > that I bring cluster in DC1 back, restart MM2 with no error and realize that > no topics was replicated from DC2->DC1. I don't know why and how to check in > this case. What's wrong in my configuration? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Mia-jeong closed pull request #10600: [WIP]MINOR: add abstract keywords to util classes
Mia-jeong closed pull request #10600: URL: https://github.com/apache/kafka/pull/10600 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on pull request #10271: URL: https://github.com/apache/kafka/pull/10271#issuecomment-827691407 @junrao Thanks for the review comments. Addressed them with the commit https://github.com/apache/kafka/pull/10271/commits/44cb1f374701cc6eca5d1df19dc9cd7c14497b3e -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
junrao commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r621418419 ## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class RemoteLogMetadataSerdeTest { + +public static final String TOPIC = "foo"; +private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); +private final Time time = new MockTime(1); + +@Test +public void testRemoteLogSegmentMetadataSerde() { +RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); + +doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); +} + +@Test +public void testRemoteLogSegmentMetadataUpdateSerde() { +RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate(); + +doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate); +} + +@Test +public void testRemotePartitionDeleteMetadataSerde() { +RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata(); + +doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata); +} + +private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { +Map segLeaderEpochs = new HashMap<>(); +segLeaderEpochs.put(0, 0L); +segLeaderEpochs.put(1, 20L); +segLeaderEpochs.put(2, 80L); +RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); +return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, +time.milliseconds(), 1024, segLeaderEpochs); +} + +private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { +RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); +return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(), + RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2); +} + +private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() { +return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED, + time.milliseconds(), 0); +} + +private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { +RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +// Serialize metadata and get the bytes. +byte[] metadataBytes = serde.serialize(remoteLogMetadata); + +// Deserialize the bytes and check the RemoteLogMetadata object is as expected. +RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde(); Review comment: Do we need to instantiate the Serde a second time? -- This is an automated message from the Apache Git Service. To respond to the message, please log o
[GitHub] [kafka] ableegoldman commented on a change in pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries
ableegoldman commented on a change in pull request #10597: URL: https://github.com/apache/kafka/pull/10597#discussion_r621436955 ## File path: docs/streams/upgrade-guide.html ## @@ -94,7 +94,14 @@ Upgrade Guide and API Changes Streams API changes in 3.0.0 -A new exception may be thrown from KafkaStreams#store(). If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors";>KIP-216 for more information. +Interactive Query may throw different new exceptions for different errors: Review comment: ```suggestion Interactive Queries may throw new exceptions for different errors: ``` ## File path: docs/streams/upgrade-guide.html ## @@ -94,7 +94,14 @@ Upgrade Guide and API Changes Streams API changes in 3.0.0 -A new exception may be thrown from KafkaStreams#store(). If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors";>KIP-216 for more information. +Interactive Query may throw different new exceptions for different errors: + + + UnknownStateStoreException: If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. + StreamsNotStartedException: If Streams state is not REBALANCING or REBALANCING, an StreamsNotStartedException will be thrown instead of the former IllegalStateException. Review comment: ```suggestion StreamsNotStartedException: If Streams state is not REBALANCING or REBALANCING, a StreamsNotStartedException will be thrown instead of the former IllegalStateException. ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -344,7 +345,7 @@ private boolean isRunningOrRebalancing() { private void validateIsRunningOrRebalancing() { if (!isRunningOrRebalancing()) { -throw new IllegalStateException("KafkaStreams is not running. State is " + state + "."); +throw new StreamsNotStartedException("KafkaStreams is not running. State is " + state + "."); Review comment: It seems a bit odd to throw a StreamsNotStartedException in case the KafkaStreams is not in RUNNING or REBALANCING. That seems to imply we would throw a a StreamsNotStartedException if the KafkaStreams had indeed been started, but then crashed or was closed to that the state is now one of the PENDING_{SHUTDOWN/ERROR} or NOT_RUNNING/ERROR. The thing I'm wondering about is whether we should (a) adopt yet another exception to cover this case specifically (eg StreamsAlreadyClosedException or something), or (b) change the name of StreamsNotStarted to be a bit more generic, eg StreamsNotRunningException, and describe which state specifically in the error message of the exception, or (c) continue throwing StreamsNotStartedException only when the state is CREATED, and just continue to throw IllegalStateException if the state is one of the closed states I listed above. Personally I think (c) makes the most sense here: then we don't need to update the KIP (other than to clarify we'll throw this for the other metadataForKey, etc methods in addition to #store). But mainly because IllegalStateException actually does seem appropriate for all state other than CREATED or RUNNING or REBALANCING -- all those other states are either terminal, or transition into a terminal state, so there's basically no way to recover and retry at this point. You'd need to bounce the app most likely. So from a user perspective you would want to catch and retry maybe on StreamsNotStartedException, but IllegalStateException maybe even should in fact kill the app so it can be restarted (eg k8s restarts the process/pod). I'm not sure we'd want to introduce another exception where the handling would be pretty much identical to not handling it at all. But I can be convinced, and maybe you have other ideas or opinions I haven't considered yet. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r621474682 ## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class RemoteLogMetadataSerdeTest { + +public static final String TOPIC = "foo"; +private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); +private final Time time = new MockTime(1); + +@Test +public void testRemoteLogSegmentMetadataSerde() { +RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); + +doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); +} + +@Test +public void testRemoteLogSegmentMetadataUpdateSerde() { +RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate(); + +doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate); +} + +@Test +public void testRemotePartitionDeleteMetadataSerde() { +RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata(); + +doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata); +} + +private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { +Map segLeaderEpochs = new HashMap<>(); +segLeaderEpochs.put(0, 0L); +segLeaderEpochs.put(1, 20L); +segLeaderEpochs.put(2, 80L); +RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); +return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, +time.milliseconds(), 1024, segLeaderEpochs); +} + +private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { +RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); +return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(), + RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2); +} + +private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() { +return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED, + time.milliseconds(), 0); +} + +private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { +RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +// Serialize metadata and get the bytes. +byte[] metadataBytes = serde.serialize(remoteLogMetadata); + +// Deserialize the bytes and check the RemoteLogMetadata object is as expected. +RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde(); Review comment: Created another `RemoteLogMetadataSerde` instance to depict the real usecase of serializer and deserializer having their own instances. Added a comment o
[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r621475071 ## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ## @@ -77,21 +87,39 @@ public void configure(Map configs, boolean isKey) { } } +private void serializeNullIndexList(final DataOutputStream out, List data) throws IOException { +List nullIndexList = IntStream.range(0, data.size()) +.filter(i -> data.get(i) == null) +.boxed().collect(Collectors.toList()); +out.writeInt(nullIndexList.size()); +for (int i : nullIndexList) out.writeInt(i); +} + @Override public byte[] serialize(String topic, List data) { if (data == null) { return null; } -final int size = data.size(); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { +out.writeByte(serStrategy.ordinal()); // write serialization strategy flag +if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) { +serializeNullIndexList(out, data); +} +final int size = data.size(); out.writeInt(size); for (Inner entry : data) { -final byte[] bytes = inner.serialize(topic, entry); -if (!isFixedLength) { -out.writeInt(bytes.length); +if (entry == null) { +if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) { +out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE); +} +} else { +final byte[] bytes = inner.serialize(topic, entry); +if (!isFixedLength || serStrategy == SerializationStrategy.NEGATIVE_SIZE) { +out.writeInt(bytes.length); Review comment: Ok, in this case, I think the best course of action is to completely remove `SerializationStrategy` flag, and replace it with a simple boolean. Do not expose it to the user, and automatically choose the strategy based on the type of data. If you agree, I'll go ahead and make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions
hachikuji commented on a change in pull request #10599: URL: https://github.com/apache/kafka/pull/10599#discussion_r621477651 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionSpec.java ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Objects; + +@InterfaceStability.Evolving +public class AbortTransactionSpec { +private final TopicPartition topicPartition; +private final long producerId; +private final short producerEpoch; +private final int coordinatorEpoch; Review comment: Yeah, I am planning to leave it for later. I recall running into some complication when I tried to implement the broker-side changes for this, but I don't remember specifically what it was. In any case, I think it can come after the tool is checked in since we need the old API for compatibility anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on pull request #10271: URL: https://github.com/apache/kafka/pull/10271#issuecomment-827811882 Thanks @junrao for the comments. Fixed checkstyle in raft module and added a comment in `RemoteLogMetadataSerde`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions
hachikuji commented on a change in pull request #10599: URL: https://github.com/apache/kafka/pull/10599#discussion_r621484136 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.AbortTransactionSpec; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.message.WriteTxnMarkersRequestData; +import org.apache.kafka.common.message.WriteTxnMarkersResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.WriteTxnMarkersRequest; +import org.apache.kafka.common.requests.WriteTxnMarkersResponse; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.Test; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AbortTransactionHandlerTest { +private final LogContext logContext = new LogContext(); +private final TopicPartition topicPartition = new TopicPartition("foo", 5); +private final AbortTransactionSpec abortSpec = new AbortTransactionSpec( +topicPartition, 12345L, (short) 15, 4321); + +@Test +public void testInvalidBuildRequestCall() { +AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext); +assertThrows(IllegalArgumentException.class, () -> handler.buildRequest(1, +emptySet())); +assertThrows(IllegalArgumentException.class, () -> handler.buildRequest(1, +mkSet(new TopicPartition("foo", 1; +assertThrows(IllegalArgumentException.class, () -> handler.buildRequest(1, +mkSet(topicPartition, new TopicPartition("foo", 1; +} + +@Test +public void testValidBuildRequestCall() { +AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext); +WriteTxnMarkersRequest.Builder request = handler.buildRequest(1, singleton(topicPartition)); +assertEquals(1, request.data.markers().size()); + +WriteTxnMarkersRequestData.WritableTxnMarker markerRequest = request.data.markers().get(0); +assertEquals(abortSpec.producerId(), markerRequest.producerId()); +assertEquals(abortSpec.producerEpoch(), markerRequest.producerEpoch()); +assertEquals(abortSpec.coordinatorEpoch(), markerRequest.coordinatorEpoch()); +assertEquals(1, markerRequest.topics().size()); + +WriteTxnMarkersRequestData.WritableTxnMarkerTopic topicRequest = markerRequest.topics().get(0); +assertEquals(abortSpec.topicPartition().topic(), topicRequest.name()); +assertEquals(singletonList(abortSpec.topicPartition().partition()), topicRequest.partitionIndexes()); +} + +@Test +public void testInvalidHandleResponseCall() { +AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext); +WriteTxnMarkersResponseData response = new WriteTxnMarkersResponseData(); +assertThrows(IllegalArgumentException.class, () -> handler.handleResponse(1, +emptySet(), new WriteTxnMarkersResponse(response))); +assertThrows(IllegalArgumentException.class, () -> handler.handleResponse(1, +mkSet(new TopicPartition("foo", 1)), new WriteTxnMarkersResponse(response))); +assertThrows(IllegalArgumentException.class, () -> handler.handleResponse(1,
[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions
hachikuji commented on a change in pull request #10599: URL: https://github.com/apache/kafka/pull/10599#discussion_r621503064 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Evolving +public class AbortTransactionResult { +private final Map> futures; + +AbortTransactionResult(Map> futures) { +this.futures = futures; +} + +public KafkaFuture all() { Review comment: Yeah, that's kind of the typical `all()` contract. We are missing a separate API to get the partition-level results. I can add that. I have also been debating whether to add a placeholder result value just in case we need to return something in the future. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on a change in pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
dejan2609 commented on a change in pull request #10466: URL: https://github.com/apache/kafka/pull/10466#discussion_r621504000 ## File path: build.gradle ## @@ -1491,13 +1491,14 @@ project(':streams') { } tasks.create(name: "copyDependantLibs", type: Copy) { -from (configurations.testRuntime) { Review comment: Ok, we have a deal then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12720) Ecosystem wiki page: Kafka Manager renamed CMAK (Cluster Manager for Apache Kafka)
Philippe Cloutier created KAFKA-12720: - Summary: Ecosystem wiki page: Kafka Manager renamed CMAK (Cluster Manager for Apache Kafka) Key: KAFKA-12720 URL: https://issues.apache.org/jira/browse/KAFKA-12720 Project: Kafka Issue Type: Task Components: documentation Reporter: Philippe Cloutier The Management Consoles section of [the Ecosystem wiki page|https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem] starts with: {quote}Kafka Manager - A tool for managing Apache Kafka.{quote} Kafka Manager was renamed "CMAK" (Cluster Manager for Apache Kafka). By the way, I recommend being more specific in that description (CMAK could be described as a web GUI rather than just as a tool). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
dejan2609 commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-827844379 @ijuma previous commits are rebased/squashed and PR branch is force-pushed. Overall: Gradle build looks ok (on my loptop, that is). Some warnings do appear (related to spotless plugin / scala compilation) but I guess those things can be solved elsewhere (I volunteer to create JIRA tickets and see what can be done). I opted to change commit message in order to emphasize Gradle version upgrade (if that is ok with you we can also change summaries for JIRA ticket and this PR). One more note: Gradle patch _**7.0.1**_ is around the corner (with lots of issues being solved: https://github.com/gradle/gradle/milestone/173). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #10294: KAFKA-12450: Remove deprecated methods from ReadOnlyWindowStore
jeqo commented on pull request #10294: URL: https://github.com/apache/kafka/pull/10294#issuecomment-827844833 @guozhangwang, it should be ready for review now. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
ijuma commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-827846345 Can you remove the Gradle version bump from this PR then? We can merge the other change and see if there's any impact. We can then go straight to Gradle 7.0.1 once it's released. It may take 1-3 weeks from the link you shared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
dejan2609 commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-827849319 Sure, let go step-by-step then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
dejan2609 commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-827855369 Done, changes are trimmed down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
JoelWee commented on a change in pull request #8923: URL: https://github.com/apache/kafka/pull/8923#discussion_r621537903 ## File path: docs/streams/developer-guide/app-reset-tool.html ## @@ -78,6 +78,9 @@ Step 1: Run the application reset tool Invoke the application reset tool from the command line/bin/kafka-streams-application-reset +Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them. + /bin/kafka-streams-application-reset Review comment: Thanks! Have updated this now to look like: https://user-images.githubusercontent.com/32009741/116301476-fa873f80-a797-11eb-9a19-de59d6771ac5.png";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
JoelWee commented on a change in pull request #8923: URL: https://github.com/apache/kafka/pull/8923#discussion_r621538528 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -205,6 +206,34 @@ private void add10InputElements() { } } +@Test +public void testResetWhenInternalTopicsAreSpecified() throws Exception { +final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); +streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + +// RUN +streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig); +streams.start(); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + +streams.close(); +waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); + +// RESET +streams.cleanUp(); + +final List internalTopics = cluster.getAllTopicsInCluster().stream() +.filter(topic -> topic.startsWith(appID + "-")) Review comment: done now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
JoelWee commented on a change in pull request #8923: URL: https://github.com/apache/kafka/pull/8923#discussion_r621542393 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ## @@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() { Assert.assertEquals(1, exitCode); } +@Test +public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() { Review comment: It seems more natural to group it together with the other `shouldNotAllowToResetWhen...` tests. E.g. shouldNotAllowToResetWhenIntermediateTopicAbsent, shouldNotAllowToResetWhenInputTopicAbsent, etc. Happy to shift it over to AbstractResetIntegrationTest.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions
hachikuji commented on a change in pull request #10599: URL: https://github.com/apache/kafka/pull/10599#discussion_r621576098 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Evolving +public class AbortTransactionResult { +private final Map> futures; + +AbortTransactionResult(Map> futures) { +this.futures = futures; +} + +public KafkaFuture all() { Review comment: I realized what I was doing here. Right now, the API is taking only a single `AbortTransactionSpec`, so `all()` has no ambiguity. In the future, we could decide to add batching, but I cannot think of a strong reason for it. It does not make much sense for the hanging transaction cleanup use case. However, if we do decide to, then it will complicate the types in here a bit because the key will probably have to be the `AbortTransactionSpec` itself since `WriteTxnMarkers` does batching both by topic partition and the tuple of `(producerId, producerEpoch, coordinatorEpoch)`. Here I decided to try and keep it simple without committing to a more granular API. Does that seem reasonable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions
hachikuji commented on a change in pull request #10599: URL: https://github.com/apache/kafka/pull/10599#discussion_r621576098 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Evolving +public class AbortTransactionResult { +private final Map> futures; + +AbortTransactionResult(Map> futures) { +this.futures = futures; +} + +public KafkaFuture all() { Review comment: I realized what I was doing here. Right now, the API is taking only a single `AbortTransactionSpec`, so `all()` has no ambiguity. In the future, we could decide to add batching, but I cannot think of a strong reason for it. It does not make much sense for the hanging transaction cleanup use case. However, if we do decide to, then it will complicate the types in here a bit because the key will probably have to be the `AbortTransactionSpec` itself since `WriteTxnMarkers` does batching both by topic partition and the tuple of `(producerId, producerEpoch, coordinatorEpoch)`. Here I decided to try and keep it simple and avoid committing to a more granular API that we may never need. Does that seem reasonable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
gharris1727 commented on a change in pull request #10475: URL: https://github.com/apache/kafka/pull/10475#discussion_r621586391 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java ## @@ -98,22 +97,56 @@ * to load internal classes, and samples information about their initialization. */ public static final String SERVICE_LOADER = "test.plugins.ServiceLoaderPlugin"; +/** + * Class name of a plugin which reads a version string from resource. + */ +public static final String READ_VERSION_FROM_RESOURCE = "test.plugins.ReadVersionFromResource"; private static final Logger log = LoggerFactory.getLogger(TestPlugins.class); -private static final Map PLUGIN_JARS; +private static final Map PLUGIN_JARS; private static final Throwable INITIALIZATION_EXCEPTION; +private static final class PluginJar { +String className; +File jarFile; + +public PluginJar(String className, File jarFile) { +this.className = className; +this.jarFile = jarFile; +} + +public String getClassName() { +return className; +} + +public File getJarFile() { +return jarFile; +} +} + static { Throwable err = null; -HashMap pluginJars = new HashMap<>(); +Map pluginJars = new HashMap<>(); try { -pluginJars.put(ALWAYS_THROW_EXCEPTION, createPluginJar("always-throw-exception")); -pluginJars.put(ALIASED_STATIC_FIELD, createPluginJar("aliased-static-field")); -pluginJars.put(SAMPLING_CONVERTER, createPluginJar("sampling-converter")); -pluginJars.put(SAMPLING_CONFIGURABLE, createPluginJar("sampling-configurable")); -pluginJars.put(SAMPLING_HEADER_CONVERTER, createPluginJar("sampling-header-converter")); -pluginJars.put(SAMPLING_CONFIG_PROVIDER, createPluginJar("sampling-config-provider")); -pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader")); +pluginJars.put("always-throw-exception", +new PluginJar(ALWAYS_THROW_EXCEPTION, createPluginJar("always-throw-exception"))); +pluginJars.put("aliased-static-field", +new PluginJar(ALIASED_STATIC_FIELD, createPluginJar("aliased-static-field"))); +pluginJars.put("sampling-converter", +new PluginJar(SAMPLING_CONVERTER, createPluginJar("sampling-converter"))); +pluginJars.put("sampling-configurable", +new PluginJar(SAMPLING_CONFIGURABLE, createPluginJar("sampling-configurable"))); +pluginJars.put("sampling-header-converter", +new PluginJar(SAMPLING_HEADER_CONVERTER, createPluginJar("sampling-header-converter"))); +pluginJars.put("sampling-config-provider", +new PluginJar(SAMPLING_CONFIG_PROVIDER, createPluginJar("sampling-config-provider"))); +pluginJars.put("service-loader", +new PluginJar(SERVICE_LOADER, createPluginJar("service-loader"))); +// Create two versions of the same plugin reading version string from a resource +pluginJars.put("read-version-from-resource-v1", Review comment: Since these string literals are now relevant elsewhere, we should make them reusable constants. Perhaps they should be enums? I realize now that perhaps the class names should have also been enums but 🤷. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java ## @@ -143,16 +176,35 @@ public static void assertAvailable() throws AssertionError { public static List pluginPath() { Review comment: The existing structure of this class bakes in the assumption that each plugin only appears once on the plugin path, and that the common use-case plugin path (returned by this method) is then valid. This change would make this method return an invalid plugin path, with flakey behavior when loading the duplicated plugin (which plugin gets loaded would be determined by iteration order over the PLUGIN_JARS hashmap). There are a couple of ways out: 1. avoid tackling this now, and separate the two plugins with different class names 2. have this method pick one of the duplicates to drop deterministically, so that the Plugins class doesn't have undefined loading behavior. 3. allow/deny some of these plugins from being included in this default plugin path, and keep some plugins back for the more specific tests 4. remove this method, and have PluginsTest explicitly include the needed plugins in each test, and/or a default list to include if none are specifically requested. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java ## @@ -98,22 +9
[jira] [Resolved] (KAFKA-6435) Application Reset Tool might delete incorrect internal topics
[ https://issues.apache.org/jira/browse/KAFKA-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-6435. --- Resolution: Fixed > Application Reset Tool might delete incorrect internal topics > - > > Key: KAFKA-6435 > URL: https://issues.apache.org/jira/browse/KAFKA-6435 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Joel Wee >Priority: Major > Labels: bug, help-wanted, newbie > Fix For: 3.0.0 > > > The streams application reset tool, deletes all topic that start with > {{-}}. > If people have two versions of the same application and name them {{"app"}} > and {{"app-v2"}}, resetting {{"app"}} would also delete the internal topics > of {{"app-v2"}}. > We either need to disallow the dash in the application ID, or improve the > topic identification logic in the reset tool to fix this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6435) Application Reset Tool might delete incorrect internal topics
[ https://issues.apache.org/jira/browse/KAFKA-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-6435: -- Fix Version/s: 3.0.0 > Application Reset Tool might delete incorrect internal topics > - > > Key: KAFKA-6435 > URL: https://issues.apache.org/jira/browse/KAFKA-6435 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Joel Wee >Priority: Major > Labels: bug, help-wanted, newbie > Fix For: 3.0.0 > > > The streams application reset tool, deletes all topic that start with > {{-}}. > If people have two versions of the same application and name them {{"app"}} > and {{"app-v2"}}, resetting {{"app"}} would also delete the internal topics > of {{"app-v2"}}. > We either need to disallow the dash in the application ID, or improve the > topic identification logic in the reset tool to fix this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12721) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsAllTopicsAllGroups
A. Sophie Blee-Goldman created KAFKA-12721: -- Summary: Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsAllTopicsAllGroups Key: KAFKA-12721 URL: https://issues.apache.org/jira/browse/KAFKA-12721 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-8923/6/testReport/kafka.admin/ResetConsumerGroupOffsetTest/Build___JDK_8_and_Scala_2_12___testResetOffsetsAllTopicsAllGroups__/ Stacktrace org.opentest4j.AssertionFailedError: Expected that consumer group has consumed all messages from topic/partition. Expected offset: 100. Actual offset: 0 at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.admin.ResetConsumerGroupOffsetTest.awaitConsumerProgress(ResetConsumerGroupOffsetTest.scala:455) at kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$5(ResetConsumerGroupOffsetTest.scala:140) at kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$5$adapted(ResetConsumerGroupOffsetTest.scala:137) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$4(ResetConsumerGroupOffsetTest.scala:137) at kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$4$adapted(ResetConsumerGroupOffsetTest.scala:136) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsAllTopicsAllGroups(ResetConsumerGroupOffsetTest.scala:136) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12666) Fix flaky kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic
[ https://issues.apache.org/jira/browse/KAFKA-12666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12666. Resolution: Duplicate > Fix flaky > kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic > > > Key: KAFKA-12666 > URL: https://issues.apache.org/jira/browse/KAFKA-12666 > Project: Kafka > Issue Type: Test >Reporter: Bruno Cadonna >Priority: Major > > Found two similar failures of this test on a PR that was unrelated: > {code:java} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, > deadlineMs=1618341006330, tries=583, nextAllowedTryMs=1618341006437) timed > out at 1618341006337 after 583 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94) > {code} > > {code:java} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: createTopics > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94) > {code} > > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10529/4/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed] > > Might be related to KAFKA-12561. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334174#comment-17334174 ] A. Sophie Blee-Goldman commented on KAFKA-12629: I'm seeing a lot of that exception as well, ie the {noformat} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. {noformat} Four different RaftClusterTest failures with this same error on https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-8923/6/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed > Flaky Test RaftClusterTest > -- > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12722) Evaluate moving replaceSegments into LogSegments class
Kowshik Prakasam created KAFKA-12722: Summary: Evaluate moving replaceSegments into LogSegments class Key: KAFKA-12722 URL: https://issues.apache.org/jira/browse/KAFKA-12722 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam The logic to replace segments is currently present as static logic in Log.scala. Since it is operating on top of `existingSegments`, we should see if we can move it into LogSegments class where it could be a better fit: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L2296. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
ableegoldman commented on a change in pull request #8923: URL: https://github.com/apache/kafka/pull/8923#discussion_r621665639 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ## @@ -151,6 +151,22 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() { Assert.assertEquals(1, exitCode); } +@Test +public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() { Review comment: Hah, maybe I should have asked why are all of those tests not also in `AbstractResetIntegrationTest`. Seems like almost everything that applies here would also be good to test in the SSL version of the test (which AFAICT is the only other one to extend the AbstractResetIntegrationTest). But I'm ok with leaving it as is, and maybe we can just look into this as followup work unless there is a good reason for them to be where they are (which I can't think of but my imagination is not endless) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r621603737 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2235,6 +2199,7 @@ private Long append(int epoch, List records, boolean isAtomic) { } else { offset = accumulator.append(epoch, records); } + Review comment: nit: the extra line is still there? ## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ## @@ -354,6 +484,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { completedBatch.data.batches().forEach(recordBatch -> { assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); }); }); +acc.close(); Review comment: Out of curiosity, why do we do this here but not elsewhere in this class? ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -194,14 +196,52 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), -currentBatch.records(), +Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } +public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { +appendLock.lock(); +try { +forceDrain(); +ByteBuffer buffer = memoryPool.tryAllocate(256); +if (buffer != null) { +MemoryRecords data = MemoryRecords.withLeaderChangeMessage( +this.nextOffset, Review comment: nit: alignment looks a little off here. one extra indent? ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1859,15 +1819,17 @@ private void appendBatch( offsetAndEpoch.offset + 1, Integer.MAX_VALUE); future.whenComplete((commitTimeMs, exception) -> { -int numRecords = batch.records.size(); -if (exception != null) { -logger.debug("Failed to commit {} records at {}", numRecords, offsetAndEpoch, exception); -} else { -long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs); -double elapsedTimePerRecord = (double) elapsedTime / numRecords; -kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs); -logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); -maybeFireHandleCommit(batch.baseOffset, epoch, batch.records); +if (batch.records.isPresent()) { Review comment: It would be nice if we didn't lose the exception for the leader change message. Perhaps one way we can do this is to add `numRecords` as a separate field in `CompletedBatch` so that we always have it available. Then we could change this to something like this: ```java future.whenComplete((commitTimeMs, exception) -> { if (exception != null) { logger.debug("Failed to commit {} records at {}", batch.numRecords, offsetAndEpoch, exception); } else { long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs); double elapsedTimePerRecord = (double) elapsedTime / batch.numRecords; kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs); logger.debug("Completed commit of {} records at {}", batch.numRecords, offsetAndEpoch); batch.records.ifPresent(records -> maybeFireHandleCommit(batch.baseOffset, epoch, records)); } } ``` ## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ## @@ -65,6 +66,135 @@ ); } +@Test +public void testLeaderChangeMessageWritten() { +int leaderEpoch = 17; +long baseOffset = 0; +int lingerMs = 50; +int maxBatchSize = 512; + +ByteBuffer buffer = ByteBuffer.allocate(256); +Mockito.when(memoryPool.tryAllocate(256)) +.thenReturn(buffer); + +BatchAccumulator acc = buildAccumulator( +leaderEpoch, +baseOffset, +lingerMs, +maxBatchSize +); + +acc.appendLeaderChangeMessage(new LeaderChangeMessage(), time.milliseconds()); + +List> batches = acc.drain(); +assertEquals(1, batches.size()); + +BatchAccumulator.CompletedBatch batch = batches.get(0); +batch.release(); +Mockito.verify(memoryPool).release(buffer); +} + +@Test +public void testForceDrain() { +asList(APPEND, APPEND_ATOMIC).forEach(a
[GitHub] [kafka] ableegoldman commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
ableegoldman commented on pull request #8923: URL: https://github.com/apache/kafka/pull/8923#issuecomment-827995691 Oof, there are a LOT of flaky test failures in this build. They're all unrelated to this PR, mostly in Connect and the RaftClusterTest, so I'll go ahead and merge, but yikes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10564: MINOR: clean up some replication code
junrao commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r621652737 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List assignment, } } +/** + * Iterate over a sequence of partitions and generate ISR changes and/or leader + * changes if necessary. + * + * @param context A human-readable context string used in log4j logging. + * @param brokerToRemoveNO_LEADER if no broker is being removed; the ID of the + * broker to remove from the ISR and leadership, otherwise. + * @param brokerToAdd NO_LEADER if no broker is being added; the ID of the + * broker which is now eligible to be a leader, otherwise. + * @param records A list of records which we will append to. + * @param iterator The iterator containing the partitions to examine. + */ +void generateLeaderAndIsrUpdates(String context, + int brokerToRemove, + int brokerToAdd, + List records, + Iterator iterator) { +int oldSize = records.size(); +Function isAcceptableLeader = +r -> r == brokerToAdd || clusterControl.unfenced(r); +while (iterator.hasNext()) { +TopicIdPartition topicIdPart = iterator.next(); +TopicControlInfo topic = topics.get(topicIdPart.topicId()); +if (topic == null) { +throw new RuntimeException("Topic ID " + topicIdPart.topicId() + +" existed in isrMembers, but not in the topics map."); +} +PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); +if (partition == null) { +throw new RuntimeException("Partition " + topicIdPart + +" existed in isrMembers, but not in the partitions map."); +} +int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove); +int newLeader; +if (isGoodLeader(newIsr, partition.leader)) { +// If the current leader is good, don't change. +newLeader = partition.leader; +} else { +// Choose a new leader. +boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name); +newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, isAcceptableLeader); Review comment: The reason that we want to remove the shutting down replicas from ISR is to optimize for latency. If we don't do that, when the broker actually shuts down, it can block the producer for replica.max.ms before the replica can be taken out of ISR. So, I think this optimization is still useful. ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -356,7 +367,9 @@ public void replay(PartitionChangeRecord record) { brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, newPartitionInfo.leader); -log.debug("Applied ISR change record: {}", record.toString()); +String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " + Review comment: In the old controller, we bump up the leader epoch if the ISR is changed during the controlled shutdown. This helps prevent the shutting down broker from being added to ISR again. In the raft controller, we bump up the partitionEpoch when the ISR is changed. Do we plan to fence a fetch request with unmatched partitionEpoch to achieve the same logic? If so, do we have a jira to track that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r621502594 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); +} + +List sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = sortedAllPartitions.size(); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp : ownedPartitions) { -if (i < maxQuota) { -consumerAssignment.add(tp); -unassignedPartitions.remove(tp); -} else { -allRevokedPartitions.add(tp); -} -++i; -} if (ownedPartitions.size() < minQuota) { +// the expected assignment size is more than consumer have now, so keep all the owned partitions +// and put this member into unfilled member list +if (ownedPartitions.size() > 0) { +consumerAssignment.addAll(ownedPartitions); +toBeRemovedPartitions.addAll(ownedPartitions); +} unfilledMembers.add(consumer); +} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { Review comment: nit: don't use `++` inline like this, it makes the code harder to understand plus we'll end up incrementing it past the `numExpectedMaxCapacityMembers` so its value won't represent the actual number of members at max capacity in the end. This might be confusing, eg if we want to add logging that includes this value later on. Let's increment it in the body of this `else if` ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassign
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r621630725 ## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ## @@ -354,6 +484,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { completedBatch.data.batches().forEach(recordBatch -> { assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); }); }); +acc.close(); Review comment: Hmm, not sure how I ended up adding that there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12666) Fix flaky kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic
[ https://issues.apache.org/jira/browse/KAFKA-12666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334205#comment-17334205 ] A. Sophie Blee-Goldman commented on KAFKA-12666: There are quite a few tickets for the (very frequently failing) RaftClusterTest issues, it seems we have [KAFKA-12629|https://issues.apache.org/jira/browse/KAFKA-12629] as an umbrella ticket so can we close this one as a duplicate to keep everything in one place? > Fix flaky > kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic > > > Key: KAFKA-12666 > URL: https://issues.apache.org/jira/browse/KAFKA-12666 > Project: Kafka > Issue Type: Test >Reporter: Bruno Cadonna >Priority: Major > > Found two similar failures of this test on a PR that was unrelated: > {code:java} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, > deadlineMs=1618341006330, tries=583, nextAllowedTryMs=1618341006437) timed > out at 1618341006337 after 583 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94) > {code} > > {code:java} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: createTopics > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94) > {code} > > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10529/4/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed] > > Might be related to KAFKA-12561. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10497: KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim
jsancio commented on a change in pull request #10497: URL: https://github.com/apache/kafka/pull/10497#discussion_r621670833 ## File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala ## @@ -108,18 +109,29 @@ class BrokerMetadataListenerTest { assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet) } + private def applyBatch( +records: List[ApiMessageAndVersion] + ): Unit = { +val baseOffset = lastMetadataOffset + 1 Review comment: Hmm. This is minor but does this mean that a `baseOffset` of 0 is not possible since `lastMetadataOffset` is initialized to `0`? Is this also true for a "regular" Kafka topic partition? Or is this just an side effect of how this test gets constructed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code
cmccabe commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r621639045 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List assignment, } } +/** + * Iterate over a sequence of partitions and generate ISR changes and/or leader + * changes if necessary. + * + * @param context A human-readable context string used in log4j logging. + * @param brokerToRemoveNO_LEADER if no broker is being removed; the ID of the Review comment: I think it's easier to use NO_LEADER since then I don't have to special-case the comparisons (I can just compare with NO_LEADER and it will fail when examining ISR members, for example). ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List assignment, } } +/** + * Iterate over a sequence of partitions and generate ISR changes and/or leader + * changes if necessary. + * + * @param context A human-readable context string used in log4j logging. + * @param brokerToRemoveNO_LEADER if no broker is being removed; the ID of the + * broker to remove from the ISR and leadership, otherwise. + * @param brokerToAdd NO_LEADER if no broker is being added; the ID of the + * broker which is now eligible to be a leader, otherwise. + * @param records A list of records which we will append to. + * @param iterator The iterator containing the partitions to examine. + */ +void generateLeaderAndIsrUpdates(String context, + int brokerToRemove, + int brokerToAdd, + List records, + Iterator iterator) { +int oldSize = records.size(); +Function isAcceptableLeader = +r -> r == brokerToAdd || clusterControl.unfenced(r); +while (iterator.hasNext()) { +TopicIdPartition topicIdPart = iterator.next(); +TopicControlInfo topic = topics.get(topicIdPart.topicId()); +if (topic == null) { +throw new RuntimeException("Topic ID " + topicIdPart.topicId() + +" existed in isrMembers, but not in the topics map."); +} +PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); +if (partition == null) { +throw new RuntimeException("Partition " + topicIdPart + +" existed in isrMembers, but not in the partitions map."); +} +int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove); +int newLeader; +if (isGoodLeader(newIsr, partition.leader)) { +// If the current leader is good, don't change. +newLeader = partition.leader; +} else { +// Choose a new leader. +boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name); +newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, isAcceptableLeader); Review comment: One thing I've been considering for controlled shutdown is that perhaps we could just keep the shutting down replicas in the ISR but move the leaders. I think that would be a bit more graceful than what we have now, but we'd have to do some extra work to get there. Anyway, let's consider this later, as you suggested... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10497: KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim
jsancio commented on a change in pull request #10497: URL: https://github.com/apache/kafka/pull/10497#discussion_r621649450 ## File path: core/src/main/scala/kafka/raft/RaftManager.scala ## @@ -126,10 +130,10 @@ class KafkaRaftManager[T]( private val dataDir = createDataDir() private val metadataLog = buildMetadataLog() private val netChannel = buildNetworkChannel() - private val raftClient = buildRaftClient() - private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix) + val client: KafkaRaftClient[T] = buildRaftClient() + private val raftIoThread = new RaftIoThread(client, threadNamePrefix) - def kafkaRaftClient: KafkaRaftClient[T] = raftClient + def kafkaRaftClient: KafkaRaftClient[T] = client Review comment: I think that since you added a new method `client: RaftClient[T]` to `RaftManager[T]` and `KafkaRaftManager` overrides it to `client: KafkaRaftClient[T]` we should be able to remove this `KafkaRaftManager[T]` only public method. ## File path: core/src/main/scala/kafka/raft/RaftManager.scala ## @@ -126,10 +130,10 @@ class KafkaRaftManager[T]( private val dataDir = createDataDir() private val metadataLog = buildMetadataLog() private val netChannel = buildNetworkChannel() - private val raftClient = buildRaftClient() - private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix) + val client: KafkaRaftClient[T] = buildRaftClient() Review comment: Did you mean to override the return type from `RaftClient[T]` to `KafkaRaftClient[T]`? ## File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala ## @@ -108,18 +109,29 @@ class BrokerMetadataListenerTest { assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet) } + private def applyBatch( +records: List[ApiMessageAndVersion] + ): Unit = { +val baseOffset = lastMetadataOffset + 1 Review comment: Hmm. This is minor but those this mean that a `baseOffset` of 0 is not possible since `lastMetadataOffset` is initialized to `0`? Is this also true for a "regular" Kafka topic partition? Or is this just an side effect of how this test gets constructed. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader reader) { listener.handleCommit(reader); } -void maybeFireHandleClaim(int epoch, long epochStartOffset) { -// We can fire `handleClaim` as soon as the listener has caught -// up to the start of the leader epoch. This guarantees that the -// state machine has seen the full committed state before it becomes -// leader and begins writing to the log. -if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) { -claimedEpoch = epoch; -listener.handleClaim(epoch); +void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) { +if (shouldFireLeaderChange(leaderAndEpoch)) { +lastFiredLeaderChange = leaderAndEpoch; +listener.handleLeaderChange(leaderAndEpoch); } } -void fireHandleResign(int epoch) { -listener.handleResign(epoch); +private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) { +if (leaderAndEpoch.equals(lastFiredLeaderChange)) { +return false; +} else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) { +return true; Review comment: I see. We want to fire this event even if the `leader` is `Optional.empty()` because we use this event to propagate lost of leadership. ## File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java ## @@ -331,22 +342,40 @@ public void beginShutdown() { } @Override -public void close() throws InterruptedException { +public void close() { log.debug("Node {}: closing.", nodeId); beginShutdown(); -eventQueue.close(); + +try { +eventQueue.close(); +} catch (InterruptedException e) { +Thread.currentThread().interrupt(); Review comment: Can you explain why we are doing this? ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java ## @@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) { this.epoch = epoch; } +public boolean isLeader(int nodeId) { +return leaderId.isPresent() && leaderId.getAsInt() == nodeId; Review comment: Minor but how about `return leaderId.equals(OptionalInt.of(nodeId));` ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader reader) { listener.handleCommit(
[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
ableegoldman commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r621676170 ## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ## @@ -77,21 +87,39 @@ public void configure(Map configs, boolean isKey) { } } +private void serializeNullIndexList(final DataOutputStream out, List data) throws IOException { +List nullIndexList = IntStream.range(0, data.size()) +.filter(i -> data.get(i) == null) +.boxed().collect(Collectors.toList()); +out.writeInt(nullIndexList.size()); +for (int i : nullIndexList) out.writeInt(i); +} + @Override public byte[] serialize(String topic, List data) { if (data == null) { return null; } -final int size = data.size(); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { +out.writeByte(serStrategy.ordinal()); // write serialization strategy flag +if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) { +serializeNullIndexList(out, data); +} +final int size = data.size(); out.writeInt(size); for (Inner entry : data) { -final byte[] bytes = inner.serialize(topic, entry); -if (!isFixedLength) { -out.writeInt(bytes.length); +if (entry == null) { +if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) { +out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE); +} +} else { +final byte[] bytes = inner.serialize(topic, entry); +if (!isFixedLength || serStrategy == SerializationStrategy.NEGATIVE_SIZE) { +out.writeInt(bytes.length); Review comment: Just to clarify you mean don't expose this to the user at all, right? That sounds completely fine to me. If there are enough people trying to serialize lists of custom classes with all constant data size who want this optimization exposed for general use, then someone will request the feature and we can go back and add it in. Then we can debate what the API should look like at that time, and keep things simple for now. Personally I suspect the vast majority of non-primitive data types are not going to be constant size anyways. Given the above, I think whether to track the strategy as an actual `SerializationStrategy` enum vs a boolean flag becomes a matter of code style and personal preference, since it's no longer exposed to the user. So it's up to you whether you find the enum or the flag to be more readable or clean -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r621734730 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ## @@ -984,19 +1003,26 @@ class LogCleanerTest { def distinctValuesBySegment = log.logSegments.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq -val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment +val distinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N), "Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.") +log.updateHighWatermark(log.activeSegment.baseOffset) cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset)) val distinctValuesBySegmentAfterClean = distinctValuesBySegment - assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) - .take(numCleanableSegments).forall { case (before, after) => after < before }, +// One segment should have been completely deleted, so there will be fewer segments. +assertTrue(distinctValuesBySegmentAfterClean.size < distinctValuesBySegmentBeforeClean.size) + +// Drop the first segment from before cleaning since it was removed. Also subtract 1 from numCleanableSegments +val normalizedDistinctValuesBySegmentBeforeClean = distinctValuesBySegmentBeforeClean.drop(1) Review comment: The logic in this test case has become rather obscure after the change. Maybe we could do something simpler than comparing segment by segment. As far as I can tell, all the test is doing is ensuring that the first uncleanable offset is respected. Maybe a simpler test would just write the same key over and over and then assert that all records below the uncleanable offset are removed and all records above that offset are retained? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
ableegoldman merged pull request #8923: URL: https://github.com/apache/kafka/pull/8923 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r621668165 ## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ## @@ -340,17 +343,18 @@ private FilterResult(ByteBuffer outputBuffer) { private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int numMessagesInBatch, boolean headerOnly) { int bytesRetained = headerOnly ? DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes(); updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), retainedBatch.lastOffset(), -retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained); +retainedBatch.lastOffset(), retainedBatch.baseOffset(), numMessagesInBatch, bytesRetained); } private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset, -int messagesRetained, int bytesRetained) { +long baseOffset, int messagesRetained, int bytesRetained) { Review comment: nit: this looks misaligned ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -700,11 +716,18 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { +if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log.dir, log.config, result.baseOffsetOfFirstBatch())) + transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex) + transactionMetadata.appendTransactionIndex() +} + outputBuffer.flip() val retained = MemoryRecords.readableRecords(outputBuffer) // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called -dest.append(largestOffset = result.maxOffset, +destSegment.get.append(largestOffset = result.maxOffset, Review comment: nit: we could probably do something like this to avoid the nasty `get` calls in here ```scala val segment = destSegment.getOrElse { val newSegment = LogCleaner.createNewCleanedSegment(log.dir, log.config, result.baseOffsetOfFirstBatch()) transactionMetadata.cleanedIndex = Some(newSegment.txnIndex) transactionMetadata.appendTransactionIndex() destSegment = Some(newSegment) newSegment } ``` ## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ## @@ -984,19 +1003,26 @@ class LogCleanerTest { def distinctValuesBySegment = log.logSegments.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq -val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment +val distinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N), "Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.") +log.updateHighWatermark(log.activeSegment.baseOffset) cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset)) val distinctValuesBySegmentAfterClean = distinctValuesBySegment - assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) - .take(numCleanableSegments).forall { case (before, after) => after < before }, +// One segment should have been completely deleted, so there will be fewer segments. +assertTrue(distinctValuesBySegmentAfterClean.size < distinctValuesBySegmentBeforeClean.size) + +// Drop the first segment from before cleaning since it was removed. Also subtract 1 from numCleanableSegments +val normalizedDistinctValuesBySegmentBeforeClean = distinctValuesBySegmentBeforeClean.drop(1) Review comment: The logic in this test case has become rather obscure after the change. Maybe we could do something simpler than comparing segment by segment. As far as I can tell, all the test is doing is ensuring that the first uncleanable offset is respected. Maybe a simpler test would just write the same key over and over and then assert that all records below the uncleanable offset are removed and all values above that offset are retained? ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -1161,6 +1191,14 @@ private[log] class CleanedTransactionMetadata { } } + /** + * Apply transactions that accumulated before cleanedIndex was applied + */ + def appendTransactionIndex(): Uni
[GitHub] [kafka] showuon commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable
showuon commented on a change in pull request #10547: URL: https://github.com/apache/kafka/pull/10547#discussion_r621757453 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java ## @@ -337,18 +337,28 @@ public void createTopic(String topic) { * @param topic The name of the topic. */ public void createTopic(String topic, int partitions) { -createTopic(topic, partitions, 1, new HashMap<>()); +createTopic(topic, partitions, 1, new HashMap<>(), new Properties()); Review comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable
chia7712 commented on a change in pull request #10547: URL: https://github.com/apache/kafka/pull/10547#discussion_r621784059 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java ## @@ -337,18 +337,28 @@ public void createTopic(String topic) { * @param topic The name of the topic. */ public void createTopic(String topic, int partitions) { -createTopic(topic, partitions, 1, new HashMap<>()); +createTopic(topic, partitions, 1, Collections.emptyMap(), new Properties()); Review comment: It seems we don't need to pass `new Properties()`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable
showuon commented on a change in pull request #10547: URL: https://github.com/apache/kafka/pull/10547#discussion_r621788834 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java ## @@ -337,18 +337,28 @@ public void createTopic(String topic) { * @param topic The name of the topic. */ public void createTopic(String topic, int partitions) { -createTopic(topic, partitions, 1, new HashMap<>()); +createTopic(topic, partitions, 1, Collections.emptyMap(), new Properties()); Review comment: yes, you're right! Sorry, I didn't catch your idea. Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries
vitojeng commented on a change in pull request #10597: URL: https://github.com/apache/kafka/pull/10597#discussion_r621794196 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -344,7 +345,7 @@ private boolean isRunningOrRebalancing() { private void validateIsRunningOrRebalancing() { if (!isRunningOrRebalancing()) { -throw new IllegalStateException("KafkaStreams is not running. State is " + state + "."); +throw new StreamsNotStartedException("KafkaStreams is not running. State is " + state + "."); Review comment: > So from a user perspective you would want to catch and retry maybe on StreamsNotStartedException, but IllegalStateException maybe even should in fact kill the app so it can be restarted (eg k8s restarts the process/pod). Totally agree, sorry that I didn't point out this before. The user can just catch and retry when StreamsNotStartedException thrown. This is different from IllegalStateException. I don't think we need introduce another exception. So I'm +1 on (c). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10547: KAFKA-12284: increase request timeout to make tests reliable
showuon commented on pull request #10547: URL: https://github.com/apache/kafka/pull/10547#issuecomment-828180085 Failed tests are all un-related. (Not in MirrorMakerIntegrationTest) ``` Build / JDK 11 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryOnlyActivePartitionStoresByDefault Build / JDK 15 and Scala 2.13 / kafka.api.TransactionsTest.testCommitTransactionTimeout() Build / JDK 15 and Scala 2.13 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10598: MINOR: rename wrong topic id variable name and description
dengziming commented on pull request #10598: URL: https://github.com/apache/kafka/pull/10598#issuecomment-828185212 > @dengziming nice improvement. LGTM > > BTW, line#203 (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L203) has weird description ( `offset id` ). Could you update it also? Thank you for your reminder, Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org