[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer

2021-04-27 Thread GitBox


kowshik edited a comment on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-808514958


   @junrao Just a heads up on the following. I'm working on the changes for the 
following in separate PRs, these are related with refactoring the recovery 
logic (KAFKA-12553):
* KAFKA-12552 (https://github.com/apache/kafka/pull/10401) to extract 
segments map **[MERGED]**
* KAFKA-12571: (https://github.com/apache/kafka/pull/10426) to eliminate 
LeaderEpochFileCache constructor dependency on logEndOffset **[MERGED]**
* KAFKA-12575: (https://github.com/apache/kafka/pull/10430) to eliminate 
Log.isLogDirOffline boolean attribute **[MERGED]**
* KAFKA-12553: (https://github.com/apache/kafka/pull/10478) Refactor 
recovery logic to introduce LogLoader  **[MERGED]**
   
   It seems better if we merge those into trunk ahead of the current PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10591: Fix minor bugs in the existing documentation

2021-04-27 Thread GitBox


showuon commented on a change in pull request #10591:
URL: https://github.com/apache/kafka/pull/10591#discussion_r620956779



##
File path: docs/ops.html
##
@@ -78,7 +78,7 @@   
auto.leader.rebalance.enable=true
 You can also set this to false, but you will then need to manually restore 
leadership to the restored replicas by running the command:
-> 
bin/kafka-preferred-replica-election.sh --bootstrap-server 
broker_host:port
+> 
bin/kafka-leader-election.sh --bootstrap-server broker_host:port

Review comment:
   The paragraph is talking about `preferred replicas` update, why do we 
need to change to use `kafka-leader-election.sh`? 

##
File path: docs/security.html
##
@@ -319,7 +319,7 @@ SSL key and certificates in PEM format
 We need to configure the following property in server.properties, 
which must have one or more comma-separated values:
 listeners
 
-If SSL is not enabled for inter-broker communication (see below 
for how to enable it), both PLAINTEXT and SSL ports will be necessary.
+If SSL is enabled for inter-broker communication (see below for 
how to enable it), both PLAINTEXT and SSL ports will be necessary.

Review comment:
   I think the section is saying:
   > If SSL is not enabled, we need to set both  PLAINTEXT and SSL ports, but 
if SSL is enabled, we only need SSL ports. 
   
   What do you think?

##
File path: docs/upgrade.html
##
@@ -420,11 +420,11 @@ Upgrading 
from 0.8.x, 0.9.x, 0.1
 if there are no snapshot files in 3.4 data directory. For more details 
about the workaround please refer to https://cwiki.apache.org/confluence/display/ZOOKEEPER/Upgrade+FAQ";>ZooKeeper
 Upgrade FAQ.
 
 
-An embedded Jetty based http://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#sc_adminserver";>AdminServer
 added in ZooKeeper 3.5.
+An embedded Jetty based http://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#sc_adminserver";>AdminServer
 added in ZooKeeper 3.5.

Review comment:
   Actually, we're using zookeeper 3.5.9. Please help update it here and 
below. Thanks.

##
File path: docs/upgrade.html
##
@@ -337,8 +337,8 @@ <
 https://github.com/apache/kafka/tree/2.5/examples";>examples folder. 
Check out
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics";>KIP-447
 for the full details.
-Added a new public api KafkaStreams.queryMetadataForKey(String, 
K, Serializer) to get detailed information on the key being queried.
-It provides information about the partition number where the key 
resides in addition to hosts containing the active and standby partitions for 
the key.
+Added a new public api KafkaStreams.queryMetadataForKey(String, 
K, Serializer) to get detailed information on the key being queried.

Review comment:
   nice catch!

##
File path: docs/toc.html
##
@@ -79,11 +79,15 @@
 Modifying 
topics
 Graceful 
shutdown
 Balancing 
leadership
-Checking 
consumer position
+Balancing Replicas 
Across Racks
 Mirroring data 
between clusters
+Checking 
consumer position
+Managing 
Consumer Groups

Review comment:
   Nice improvement to add missing TOC!

##
File path: docs/upgrade.html
##
@@ -136,8 +136,8 @@ Notable changes in 2
 The 2.7.0 release includes the core Raft implementation specified in
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum";>KIP-595.
 There is a separate "raft" module containing most of the logic. Until 
integration with the
-controller is complete, there is a standalone server that users can 
use for testing the p
-erformance of the Raft implementation.  See the README.md in the raft 
module for details

Review comment:
   Nice catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-27 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r620996380



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadata}. This is the root serde
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataSerde {
+private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
+private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private static final Map 
REMOTE_LOG_STORAGE_CLASS_TO_API_KEY = createRemoteLogStorageClassToApiKeyMap();
+private static final Map 
KEY_TO_TRANSFORM = createRemoteLogMetadataTransforms();
+
+private static final BytesApiMessageSerde BYTES_API_MESSAGE_SERDE = new 
BytesApiMessageSerde() {
+@Override
+public ApiMessage apiMessageFor(short apiKey) {

Review comment:
   Changed remote log metadata record type as `metadata` and enabled 
metadata record generation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions

2021-04-27 Thread GitBox


dajac commented on a change in pull request #10599:
URL: https://github.com/apache/kafka/pull/10599#discussion_r620974574



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class AbortTransactionResult {

Review comment:
   Should we add some javadoc to the classes/methods published in our 
public API?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionSpec.java
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+
+@InterfaceStability.Evolving
+public class AbortTransactionSpec {
+private final TopicPartition topicPartition;
+private final long producerId;
+private final short producerEpoch;
+private final int coordinatorEpoch;

Review comment:
   In the KIP, you also mentioned `transactionStartOffset`. Is it not 
required anymore or do you plan to add it later on?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static j

[GitHub] [kafka] daehokimm commented on pull request #9259: KAFKA-10466: Allow regex for MaskField SMT to replacement

2021-04-27 Thread GitBox


daehokimm commented on pull request #9259:
URL: https://github.com/apache/kafka/pull/9259#issuecomment-827442242


   please review this :) 🙏 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-27 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r621028610



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadata}. This is the root serde
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataSerde {

Review comment:
   I prefer avoiding singletons if possible. I have changed this class to 
be open and extensible. :) This can be useful in the future if we need to 
extend this for tests or other purposes. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries

2021-04-27 Thread GitBox


vitojeng commented on pull request #10597:
URL: https://github.com/apache/kafka/pull/10597#issuecomment-827475511


   @ableegoldman Please take a look. :)
   If all work in this PR is completed, I will update the KIP and discussion 
thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10592: MINOR: Remove redudant test files and close LogSegment after test

2021-04-27 Thread GitBox


chia7712 commented on pull request #10592:
URL: https://github.com/apache/kafka/pull/10592#issuecomment-827477372


   @dengziming nice find and thanks for your patch. Those files are noisy to me 
:(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10446: KAFKA-12661 ConfigEntry#equal does not compare other fields when value is NOT null

2021-04-27 Thread GitBox


chia7712 commented on pull request #10446:
URL: https://github.com/apache/kafka/pull/10446#issuecomment-827478300


   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   ```
   
   unrelated error. @ijuma @dajac please take a look if you have free cycles.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Nathan22177 commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-27 Thread GitBox


Nathan22177 commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r621061574



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   Oh, I see.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Nathan22177 commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-27 Thread GitBox


Nathan22177 commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r621063125



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
##
@@ -472,11 +472,26 @@ public void shouldThrowNullPointerOnRemoveIfKeyIsNull() {
 assertThrows(NullPointerException.class, () -> store.remove(null));
 }
 
+@Test
+public void shouldThrowNullPointerOnPutIfWrappedKeyIsNull() {
+assertThrows(NullPointerException.class, () -> store.put(new 
Windowed<>(null, new SessionWindow(0, 0)), "a"));

Review comment:
   Yeah, sorry, I forget to do that sometimes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config

2021-04-27 Thread Bui Thanh MInh (Jira)
Bui Thanh MInh created KAFKA-12719:
--

 Summary: Kafka MirrorMaker 2 can only mirror in one direction in 
Active/Active config
 Key: KAFKA-12719
 URL: https://issues.apache.org/jira/browse/KAFKA-12719
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Bui Thanh MInh


Config:

```

clusters = DC1, DC2
NPS.bootstrap.servers = 
NTL.bootstrap.servers = 

# Source and target clusters configurations.
config.storage.replication.factor = 3

offset.storage.replication.factor = 3

status.storage.replication.factor = 3

DC1->DC2.enabled = true
DC2->DC1.enabled = true

# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3

topics = .*
groups = .*

```

In my test case, I turn off whole DC1, and client will switch to DC2, after 
that I bring cluster in DC1 back, restart MM2 with no error and realize that no 
topics was replicated from DC2->DC1. I don't know why and how to check in this 
case. What's wrong in my configuration? 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config

2021-04-27 Thread Bui Thanh MInh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bui Thanh MInh updated KAFKA-12719:
---
Description: 
Config:

_clusters = DC1, DC2_
_NPS.bootstrap.servers = _
_NTL.bootstrap.servers = _

_# Source and target clusters configurations._
_config.storage.replication.factor = 3_

_offset.storage.replication.factor = 3_

_status.storage.replication.factor = 3_

_DC1->DC2.enabled = true_
_DC2->DC1.enabled = true_

_# Mirror maker configurations._
_offset-syncs.topic.replication.factor = 3_
_heartbeats.topic.replication.factor = 3_
_checkpoints.topic.replication.factor = 3_

_topics = .*_
_groups = .*_

 

In my test case, I turn off whole DC1, and client will switch to DC2, after 
that I bring cluster in DC1 back, restart MM2 with no error and realize that no 
topics was replicated from DC2->DC1. I don't know why and how to check in this 
case. What's wrong in my configuration? 

 

  was:
Config:

```

clusters = DC1, DC2
NPS.bootstrap.servers = 
NTL.bootstrap.servers = 

# Source and target clusters configurations.
config.storage.replication.factor = 3

offset.storage.replication.factor = 3

status.storage.replication.factor = 3

DC1->DC2.enabled = true
DC2->DC1.enabled = true

# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3

topics = .*
groups = .*

 

```

In my test case, I turn off whole DC1, and client will switch to DC2, after 
that I bring cluster in DC1 back, restart MM2 with no error and realize that no 
topics was replicated from DC2->DC1. I don't know why and how to check in this 
case. What's wrong in my configuration? 

 


> Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config
> 
>
> Key: KAFKA-12719
> URL: https://issues.apache.org/jira/browse/KAFKA-12719
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bui Thanh MInh
>Priority: Major
>
> Config:
> _clusters = DC1, DC2_
> _NPS.bootstrap.servers = _
> _NTL.bootstrap.servers = _
> _# Source and target clusters configurations._
> _config.storage.replication.factor = 3_
> _offset.storage.replication.factor = 3_
> _status.storage.replication.factor = 3_
> _DC1->DC2.enabled = true_
> _DC2->DC1.enabled = true_
> _# Mirror maker configurations._
> _offset-syncs.topic.replication.factor = 3_
> _heartbeats.topic.replication.factor = 3_
> _checkpoints.topic.replication.factor = 3_
> _topics = .*_
> _groups = .*_
>  
> In my test case, I turn off whole DC1, and client will switch to DC2, after 
> that I bring cluster in DC1 back, restart MM2 with no error and realize that 
> no topics was replicated from DC2->DC1. I don't know why and how to check in 
> this case. What's wrong in my configuration? 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config

2021-04-27 Thread Bui Thanh MInh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bui Thanh MInh updated KAFKA-12719:
---
Description: 
Config:

```

clusters = DC1, DC2
NPS.bootstrap.servers = 
NTL.bootstrap.servers = 

# Source and target clusters configurations.
config.storage.replication.factor = 3

offset.storage.replication.factor = 3

status.storage.replication.factor = 3

DC1->DC2.enabled = true
DC2->DC1.enabled = true

# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3

topics = .*
groups = .*

 

```

In my test case, I turn off whole DC1, and client will switch to DC2, after 
that I bring cluster in DC1 back, restart MM2 with no error and realize that no 
topics was replicated from DC2->DC1. I don't know why and how to check in this 
case. What's wrong in my configuration? 

 

  was:
Config:

```

clusters = DC1, DC2
NPS.bootstrap.servers = 
NTL.bootstrap.servers = 

# Source and target clusters configurations.
config.storage.replication.factor = 3

offset.storage.replication.factor = 3

status.storage.replication.factor = 3

DC1->DC2.enabled = true
DC2->DC1.enabled = true

# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3

topics = .*
groups = .*

```

In my test case, I turn off whole DC1, and client will switch to DC2, after 
that I bring cluster in DC1 back, restart MM2 with no error and realize that no 
topics was replicated from DC2->DC1. I don't know why and how to check in this 
case. What's wrong in my configuration? 

 


> Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config
> 
>
> Key: KAFKA-12719
> URL: https://issues.apache.org/jira/browse/KAFKA-12719
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bui Thanh MInh
>Priority: Major
>
> Config:
> ```
> clusters = DC1, DC2
> NPS.bootstrap.servers = 
> NTL.bootstrap.servers = 
> # Source and target clusters configurations.
> config.storage.replication.factor = 3
> offset.storage.replication.factor = 3
> status.storage.replication.factor = 3
> DC1->DC2.enabled = true
> DC2->DC1.enabled = true
> # Mirror maker configurations.
> offset-syncs.topic.replication.factor = 3
> heartbeats.topic.replication.factor = 3
> checkpoints.topic.replication.factor = 3
> topics = .*
> groups = .*
>  
> ```
> In my test case, I turn off whole DC1, and client will switch to DC2, after 
> that I bring cluster in DC1 back, restart MM2 with no error and realize that 
> no topics was replicated from DC2->DC1. I don't know why and how to check in 
> this case. What's wrong in my configuration? 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-27 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r620996380



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadata}. This is the root serde
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataSerde {
+private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
+private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private static final Map 
REMOTE_LOG_STORAGE_CLASS_TO_API_KEY = createRemoteLogStorageClassToApiKeyMap();
+private static final Map 
KEY_TO_TRANSFORM = createRemoteLogMetadataTransforms();
+
+private static final BytesApiMessageSerde BYTES_API_MESSAGE_SERDE = new 
BytesApiMessageSerde() {
+@Override
+public ApiMessage apiMessageFor(short apiKey) {

Review comment:
   Good point. Changed remote log metadata record type as `metadata` and 
enabled metadata record generation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10598: MINOR: rename wrong topic id variable name and description

2021-04-27 Thread GitBox


dengziming commented on pull request #10598:
URL: https://github.com/apache/kafka/pull/10598#issuecomment-827554412


   @showuon Thank you, I also think it's not reasonable to fix it here since 
there are also some other occurrence, so I revert the JMap change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable

2021-04-27 Thread GitBox


chia7712 commented on a change in pull request #10547:
URL: https://github.com/apache/kafka/pull/10547#discussion_r621167591



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -337,18 +337,28 @@ public void createTopic(String topic) {
  * @param topic The name of the topic.
  */
 public void createTopic(String topic, int partitions) {
-createTopic(topic, partitions, 1, new HashMap<>());
+createTopic(topic, partitions, 1, new HashMap<>(), new Properties());

Review comment:
   How about replace this by `createTopic(topic, partitions, 1, 
Collections.emptyMap());`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable

2021-04-27 Thread GitBox


showuon commented on a change in pull request #10547:
URL: https://github.com/apache/kafka/pull/10547#discussion_r621231344



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -337,18 +337,28 @@ public void createTopic(String topic) {
  * @param topic The name of the topic.
  */
 public void createTopic(String topic, int partitions) {
-createTopic(topic, partitions, 1, new HashMap<>());
+createTopic(topic, partitions, 1, new HashMap<>(), new Properties());

Review comment:
   Good to me. I'll work on it tomorrow! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Mia-jeong opened a new pull request #10600: [WIP]MINOR: add abstract keywords to util classes

2021-04-27 Thread GitBox


Mia-jeong opened a new pull request #10600:
URL: https://github.com/apache/kafka/pull/10600


   I think that Util classes are intended for using the static method without 
instantiating the class.
   So in order to be clear, I add abstract keywords to some Util classes which 
have no private constructor.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12719) Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config

2021-04-27 Thread Bui Thanh MInh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bui Thanh MInh resolved KAFKA-12719.

Resolution: Auto Closed

Need to restart all mm2 instances

> Kafka MirrorMaker 2 can only mirror in one direction in Active/Active config
> 
>
> Key: KAFKA-12719
> URL: https://issues.apache.org/jira/browse/KAFKA-12719
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bui Thanh MInh
>Priority: Major
>
> Config:
> _clusters = DC1, DC2_
> _NPS.bootstrap.servers = _
> _NTL.bootstrap.servers = _
> _# Source and target clusters configurations._
> _config.storage.replication.factor = 3_
> _offset.storage.replication.factor = 3_
> _status.storage.replication.factor = 3_
> _DC1->DC2.enabled = true_
> _DC2->DC1.enabled = true_
> _# Mirror maker configurations._
> _offset-syncs.topic.replication.factor = 3_
> _heartbeats.topic.replication.factor = 3_
> _checkpoints.topic.replication.factor = 3_
> _topics = .*_
> _groups = .*_
>  
> In my test case, I turn off whole DC1, and client will switch to DC2, after 
> that I bring cluster in DC1 back, restart MM2 with no error and realize that 
> no topics was replicated from DC2->DC1. I don't know why and how to check in 
> this case. What's wrong in my configuration? 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Mia-jeong closed pull request #10600: [WIP]MINOR: add abstract keywords to util classes

2021-04-27 Thread GitBox


Mia-jeong closed pull request #10600:
URL: https://github.com/apache/kafka/pull/10600


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-27 Thread GitBox


satishd commented on pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#issuecomment-827691407


   @junrao Thanks for the review comments. Addressed them with the commit 
https://github.com/apache/kafka/pull/10271/commits/44cb1f374701cc6eca5d1df19dc9cd7c14497b3e


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-27 Thread GitBox


junrao commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r621418419



##
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RemoteLogMetadataSerdeTest {
+
+public static final String TOPIC = "foo";
+private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
+private final Time time = new MockTime(1);
+
+@Test
+public void testRemoteLogSegmentMetadataSerde() {
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
createRemoteLogSegmentMetadata();
+
+doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata);
+}
+
+@Test
+public void testRemoteLogSegmentMetadataUpdateSerde() {
+RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = 
createRemoteLogSegmentMetadataUpdate();
+
+doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate);
+}
+
+@Test
+public void testRemotePartitionDeleteMetadataSerde() {
+RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = 
createRemotePartitionDeleteMetadata();
+
+doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata);
+}
+
+private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
+Map segLeaderEpochs = new HashMap<>();
+segLeaderEpochs.put(0, 0L);
+segLeaderEpochs.put(1, 20L);
+segLeaderEpochs.put(2, 80L);
+RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1,
+time.milliseconds(), 1024, 
segLeaderEpochs);
+}
+
+private RemoteLogSegmentMetadataUpdate 
createRemoteLogSegmentMetadataUpdate() {
+RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, 
time.milliseconds(),
+  
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
+}
+
+private RemotePartitionDeleteMetadata 
createRemotePartitionDeleteMetadata() {
+return new RemotePartitionDeleteMetadata(TP0, 
RemotePartitionDeleteState.DELETE_PARTITION_MARKED,
+ time.milliseconds(), 0);
+}
+
+private void doTestRemoteLogMetadataSerde(RemoteLogMetadata 
remoteLogMetadata) {
+RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+// Serialize metadata and get the bytes.
+byte[] metadataBytes = serde.serialize(remoteLogMetadata);
+
+// Deserialize the bytes and check the RemoteLogMetadata object is as 
expected.
+RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde();

Review comment:
   Do we need to instantiate the Serde a second time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log o

[GitHub] [kafka] ableegoldman commented on a change in pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries

2021-04-27 Thread GitBox


ableegoldman commented on a change in pull request #10597:
URL: https://github.com/apache/kafka/pull/10597#discussion_r621436955



##
File path: docs/streams/upgrade-guide.html
##
@@ -94,7 +94,14 @@ Upgrade Guide and API Changes
 
 Streams API changes in 3.0.0
 
-A new exception may be thrown from KafkaStreams#store(). 
If the specified store name does not exist in the topology, an 
UnknownStateStoreException will be thrown instead of the former 
InvalidStateStoreException. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors";>KIP-216
 for more information.
+Interactive Query may throw different new exceptions for different 
errors:

Review comment:
   ```suggestion
   Interactive Queries may throw new exceptions for different errors:
   ```

##
File path: docs/streams/upgrade-guide.html
##
@@ -94,7 +94,14 @@ Upgrade Guide and API Changes
 
 Streams API changes in 3.0.0
 
-A new exception may be thrown from KafkaStreams#store(). 
If the specified store name does not exist in the topology, an 
UnknownStateStoreException will be thrown instead of the former 
InvalidStateStoreException. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors";>KIP-216
 for more information.
+Interactive Query may throw different new exceptions for different 
errors:
+
+
+ UnknownStateStoreException: If the specified store 
name does not exist in the topology, an UnknownStateStoreException 
will be thrown instead of the former 
InvalidStateStoreException.
+ StreamsNotStartedException: If Streams state is not 
REBALANCING or REBALANCING, an 
StreamsNotStartedException will be thrown instead of the former 
IllegalStateException.

Review comment:
   ```suggestion
StreamsNotStartedException: If Streams state is 
not REBALANCING or REBALANCING, a 
StreamsNotStartedException will be thrown instead of the former 
IllegalStateException.
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -344,7 +345,7 @@ private boolean isRunningOrRebalancing() {
 
 private void validateIsRunningOrRebalancing() {
 if (!isRunningOrRebalancing()) {
-throw new IllegalStateException("KafkaStreams is not running. 
State is " + state + ".");
+throw new StreamsNotStartedException("KafkaStreams is not running. 
State is " + state + ".");

Review comment:
   It seems a bit odd to throw a StreamsNotStartedException in case the 
KafkaStreams is not in RUNNING or REBALANCING. That seems to imply we would 
throw a a StreamsNotStartedException if the KafkaStreams had indeed been 
started, but then crashed or was closed to that the state is now one of the 
PENDING_{SHUTDOWN/ERROR} or NOT_RUNNING/ERROR.
   The thing I'm wondering about is whether we should (a) adopt yet another 
exception to cover this case specifically (eg StreamsAlreadyClosedException or 
something), or (b) change the name of StreamsNotStarted to be a bit more 
generic, eg StreamsNotRunningException, and describe which state specifically 
in the error message of the exception, or (c) continue throwing 
StreamsNotStartedException only when the state is CREATED, and just continue to 
throw IllegalStateException if the state is one of the closed states I listed 
above.
   
   Personally I think (c) makes the most sense here: then we don't need to 
update the KIP (other than to clarify we'll throw this for the other 
metadataForKey, etc methods in addition to #store). But mainly because 
IllegalStateException actually does seem appropriate for all state other than 
CREATED or RUNNING or REBALANCING -- all those other states are either 
terminal, or transition into a terminal state, so there's basically no way to 
recover and retry at this point. You'd need to bounce the app most likely. So 
from a user perspective you would want to catch and retry maybe on 
StreamsNotStartedException, but IllegalStateException maybe even should in fact 
kill the app so it can be restarted (eg k8s restarts the process/pod). I'm not 
sure we'd want to introduce another exception where the handling would be 
pretty much identical to not handling it at all. But I can be convinced, and 
maybe you have other ideas or opinions I haven't considered yet. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-27 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r621474682



##
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RemoteLogMetadataSerdeTest {
+
+public static final String TOPIC = "foo";
+private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
+private final Time time = new MockTime(1);
+
+@Test
+public void testRemoteLogSegmentMetadataSerde() {
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
createRemoteLogSegmentMetadata();
+
+doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata);
+}
+
+@Test
+public void testRemoteLogSegmentMetadataUpdateSerde() {
+RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = 
createRemoteLogSegmentMetadataUpdate();
+
+doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate);
+}
+
+@Test
+public void testRemotePartitionDeleteMetadataSerde() {
+RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = 
createRemotePartitionDeleteMetadata();
+
+doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata);
+}
+
+private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
+Map segLeaderEpochs = new HashMap<>();
+segLeaderEpochs.put(0, 0L);
+segLeaderEpochs.put(1, 20L);
+segLeaderEpochs.put(2, 80L);
+RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 
1,
+time.milliseconds(), 1024, 
segLeaderEpochs);
+}
+
+private RemoteLogSegmentMetadataUpdate 
createRemoteLogSegmentMetadataUpdate() {
+RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, 
time.milliseconds(),
+  
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
+}
+
+private RemotePartitionDeleteMetadata 
createRemotePartitionDeleteMetadata() {
+return new RemotePartitionDeleteMetadata(TP0, 
RemotePartitionDeleteState.DELETE_PARTITION_MARKED,
+ time.milliseconds(), 0);
+}
+
+private void doTestRemoteLogMetadataSerde(RemoteLogMetadata 
remoteLogMetadata) {
+RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+// Serialize metadata and get the bytes.
+byte[] metadataBytes = serde.serialize(remoteLogMetadata);
+
+// Deserialize the bytes and check the RemoteLogMetadata object is as 
expected.
+RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde();

Review comment:
   Created another `RemoteLogMetadataSerde` instance to depict the real 
usecase of serializer and deserializer having their own instances. Added a 
comment o

[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-04-27 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r621475071



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##
@@ -77,21 +87,39 @@ public void configure(Map configs, boolean 
isKey) {
 }
 }
 
+private void serializeNullIndexList(final DataOutputStream out, 
List data) throws IOException {
+List nullIndexList = IntStream.range(0, data.size())
+.filter(i -> data.get(i) == null)
+.boxed().collect(Collectors.toList());
+out.writeInt(nullIndexList.size());
+for (int i : nullIndexList) out.writeInt(i);
+}
+
 @Override
 public byte[] serialize(String topic, List data) {
 if (data == null) {
 return null;
 }
-final int size = data.size();
 try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
  final DataOutputStream out = new DataOutputStream(baos)) {
+out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+serializeNullIndexList(out, data);
+}
+final int size = data.size();
 out.writeInt(size);
 for (Inner entry : data) {
-final byte[] bytes = inner.serialize(topic, entry);
-if (!isFixedLength) {
-out.writeInt(bytes.length);
+if (entry == null) {
+if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+}
+} else {
+final byte[] bytes = inner.serialize(topic, entry);
+if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+out.writeInt(bytes.length);

Review comment:
   Ok, in this case, I think the best course of action is to completely 
remove `SerializationStrategy` flag, and replace it with a simple boolean. Do 
not expose it to the user, and automatically choose the strategy based on the 
type of data.
   
   If you agree, I'll go ahead and make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #10599:
URL: https://github.com/apache/kafka/pull/10599#discussion_r621477651



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionSpec.java
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+
+@InterfaceStability.Evolving
+public class AbortTransactionSpec {
+private final TopicPartition topicPartition;
+private final long producerId;
+private final short producerEpoch;
+private final int coordinatorEpoch;

Review comment:
   Yeah, I am planning to leave it for later. I recall running into some 
complication when I tried to implement the broker-side changes for this, but I 
don't remember specifically what it was. In any case, I think it can come after 
the tool is checked in since we need the old API for compatibility anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-27 Thread GitBox


satishd commented on pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#issuecomment-827811882


   Thanks @junrao for the comments. Fixed checkstyle in raft module and added a 
comment in `RemoteLogMetadataSerde`.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #10599:
URL: https://github.com/apache/kafka/pull/10599#discussion_r621484136



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbortTransactionHandlerTest {
+private final LogContext logContext = new LogContext();
+private final TopicPartition topicPartition = new TopicPartition("foo", 5);
+private final AbortTransactionSpec abortSpec = new AbortTransactionSpec(
+topicPartition, 12345L, (short) 15, 4321);
+
+@Test
+public void testInvalidBuildRequestCall() {
+AbortTransactionHandler handler = new 
AbortTransactionHandler(abortSpec, logContext);
+assertThrows(IllegalArgumentException.class, () -> 
handler.buildRequest(1,
+emptySet()));
+assertThrows(IllegalArgumentException.class, () -> 
handler.buildRequest(1,
+mkSet(new TopicPartition("foo", 1;
+assertThrows(IllegalArgumentException.class, () -> 
handler.buildRequest(1,
+mkSet(topicPartition, new TopicPartition("foo", 1;
+}
+
+@Test
+public void testValidBuildRequestCall() {
+AbortTransactionHandler handler = new 
AbortTransactionHandler(abortSpec, logContext);
+WriteTxnMarkersRequest.Builder request = handler.buildRequest(1, 
singleton(topicPartition));
+assertEquals(1, request.data.markers().size());
+
+WriteTxnMarkersRequestData.WritableTxnMarker markerRequest = 
request.data.markers().get(0);
+assertEquals(abortSpec.producerId(), markerRequest.producerId());
+assertEquals(abortSpec.producerEpoch(), markerRequest.producerEpoch());
+assertEquals(abortSpec.coordinatorEpoch(), 
markerRequest.coordinatorEpoch());
+assertEquals(1, markerRequest.topics().size());
+
+WriteTxnMarkersRequestData.WritableTxnMarkerTopic topicRequest = 
markerRequest.topics().get(0);
+assertEquals(abortSpec.topicPartition().topic(), topicRequest.name());
+assertEquals(singletonList(abortSpec.topicPartition().partition()), 
topicRequest.partitionIndexes());
+}
+
+@Test
+public void testInvalidHandleResponseCall() {
+AbortTransactionHandler handler = new 
AbortTransactionHandler(abortSpec, logContext);
+WriteTxnMarkersResponseData response = new 
WriteTxnMarkersResponseData();
+assertThrows(IllegalArgumentException.class, () -> 
handler.handleResponse(1,
+emptySet(), new WriteTxnMarkersResponse(response)));
+assertThrows(IllegalArgumentException.class, () -> 
handler.handleResponse(1,
+mkSet(new TopicPartition("foo", 1)), new 
WriteTxnMarkersResponse(response)));
+assertThrows(IllegalArgumentException.class, () -> 
handler.handleResponse(1,

[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #10599:
URL: https://github.com/apache/kafka/pull/10599#discussion_r621503064



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class AbortTransactionResult {
+private final Map> futures;
+
+AbortTransactionResult(Map> futures) 
{
+this.futures = futures;
+}
+
+public KafkaFuture all() {

Review comment:
   Yeah, that's kind of the typical `all()` contract. We are missing a 
separate API to get the partition-level results. I can add that. I have also 
been debating whether to add a placeholder result value just in case we need to 
return something in the future. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on a change in pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-27 Thread GitBox


dejan2609 commented on a change in pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#discussion_r621504000



##
File path: build.gradle
##
@@ -1491,13 +1491,14 @@ project(':streams') {
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
-from (configurations.testRuntime) {

Review comment:
   Ok, we have a deal then. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12720) Ecosystem wiki page: Kafka Manager renamed CMAK (Cluster Manager for Apache Kafka)

2021-04-27 Thread Philippe Cloutier (Jira)
Philippe Cloutier created KAFKA-12720:
-

 Summary: Ecosystem wiki page: Kafka Manager renamed CMAK (Cluster 
Manager for Apache Kafka)
 Key: KAFKA-12720
 URL: https://issues.apache.org/jira/browse/KAFKA-12720
 Project: Kafka
  Issue Type: Task
  Components: documentation
Reporter: Philippe Cloutier


The Management Consoles section of [the Ecosystem wiki 
page|https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem] starts with:
{quote}Kafka Manager - A tool for managing Apache Kafka.{quote}

Kafka Manager was renamed "CMAK" (Cluster Manager for Apache Kafka).


By the way, I recommend being more specific in that description (CMAK could be 
described as a web GUI rather than just as a tool).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-27 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-827844379


   @ijuma previous commits are rebased/squashed and PR branch is force-pushed.
   
   Overall: Gradle build looks ok (on my loptop, that is). Some warnings do 
appear (related to spotless plugin / scala compilation) but I guess those 
things can be solved elsewhere (I volunteer to create JIRA tickets and see what 
can be done).
   
   I opted to change commit message in order to emphasize Gradle version 
upgrade (if that is ok with you we can also change summaries for JIRA ticket 
and this PR).
   
   One more note: Gradle patch _**7.0.1**_ is around the corner (with lots of 
issues being solved: https://github.com/gradle/gradle/milestone/173).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on pull request #10294: KAFKA-12450: Remove deprecated methods from ReadOnlyWindowStore

2021-04-27 Thread GitBox


jeqo commented on pull request #10294:
URL: https://github.com/apache/kafka/pull/10294#issuecomment-827844833


   @guozhangwang, it should be ready for review now. thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-27 Thread GitBox


ijuma commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-827846345


   Can you remove the Gradle version bump from this PR then? We can merge the 
other change and see if there's any impact. We can then go straight to Gradle 
7.0.1 once it's released. It may take 1-3 weeks from the link you shared.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-27 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-827849319


   Sure, let go step-by-step then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-27 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-827855369


   Done, changes are trimmed down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-27 Thread GitBox


JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621537903



##
File path: docs/streams/developer-guide/app-reset-tool.html
##
@@ -78,6 +78,9 @@
 Step 1: Run the application reset tool
 Invoke the application reset tool from the command line
 /bin/kafka-streams-application-reset
+Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that you run this once with --dry-run to preview 
your changes before making them.
+/bin/kafka-streams-application-reset

Review comment:
   Thanks! Have updated this now to look like:
   
   https://user-images.githubusercontent.com/32009741/116301476-fa873f80-a797-11eb-9a19-de59d6771ac5.png";>
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-27 Thread GitBox


JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621538528



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -205,6 +206,34 @@ private void add10InputElements() {
 }
 }
 
+@Test
+public void testResetWhenInternalTopicsAreSpecified() throws Exception {
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+// RUN
+streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, 
OUTPUT_TOPIC_2), streamsConfig);
+streams.start();
+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+streams.close();
+waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
+
+// RESET
+streams.cleanUp();
+
+final List internalTopics = 
cluster.getAllTopicsInCluster().stream()
+.filter(topic -> topic.startsWith(appID + "-"))

Review comment:
   done now :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-27 Thread GitBox


JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621542393



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##
@@ -151,6 +151,22 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() {
 Assert.assertEquals(1, exitCode);
 }
 
+@Test
+public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
   It seems more natural to group it together with the other 
`shouldNotAllowToResetWhen...` tests. E.g. 
shouldNotAllowToResetWhenIntermediateTopicAbsent, 
shouldNotAllowToResetWhenInputTopicAbsent, etc.
   
   Happy to shift it over to AbstractResetIntegrationTest.java




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #10599:
URL: https://github.com/apache/kafka/pull/10599#discussion_r621576098



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class AbortTransactionResult {
+private final Map> futures;
+
+AbortTransactionResult(Map> futures) 
{
+this.futures = futures;
+}
+
+public KafkaFuture all() {

Review comment:
   I realized what I was doing here. Right now, the API is taking only a 
single `AbortTransactionSpec`, so `all()` has no ambiguity. In the future, we 
could decide to add batching, but I cannot think of a strong reason for it. It 
does not make much sense for the hanging transaction cleanup use case. However, 
if we do decide to, then it will complicate the types in here a bit because the 
key will probably have to be the `AbortTransactionSpec` itself since 
`WriteTxnMarkers` does batching both by topic partition and the tuple of 
`(producerId, producerEpoch, coordinatorEpoch)`. Here I decided to try and keep 
it simple without committing to a more granular API. Does that seem reasonable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #10599:
URL: https://github.com/apache/kafka/pull/10599#discussion_r621576098



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class AbortTransactionResult {
+private final Map> futures;
+
+AbortTransactionResult(Map> futures) 
{
+this.futures = futures;
+}
+
+public KafkaFuture all() {

Review comment:
   I realized what I was doing here. Right now, the API is taking only a 
single `AbortTransactionSpec`, so `all()` has no ambiguity. In the future, we 
could decide to add batching, but I cannot think of a strong reason for it. It 
does not make much sense for the hanging transaction cleanup use case. However, 
if we do decide to, then it will complicate the types in here a bit because the 
key will probably have to be the `AbortTransactionSpec` itself since 
`WriteTxnMarkers` does batching both by topic partition and the tuple of 
`(producerId, producerEpoch, coordinatorEpoch)`. Here I decided to try and keep 
it simple and avoid committing to a more granular API that we may never need. 
Does that seem reasonable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-27 Thread GitBox


gharris1727 commented on a change in pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#discussion_r621586391



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
##
@@ -98,22 +97,56 @@
  * to load internal classes, and samples information about their 
initialization.
  */
 public static final String SERVICE_LOADER = 
"test.plugins.ServiceLoaderPlugin";
+/**
+ * Class name of a plugin which reads a version string from resource.
+ */
+public static final String READ_VERSION_FROM_RESOURCE = 
"test.plugins.ReadVersionFromResource";
 
 private static final Logger log = 
LoggerFactory.getLogger(TestPlugins.class);
-private static final Map PLUGIN_JARS;
+private static final Map PLUGIN_JARS;
 private static final Throwable INITIALIZATION_EXCEPTION;
 
+private static final class PluginJar {
+String className;
+File jarFile;
+
+public PluginJar(String className, File jarFile) {
+this.className = className;
+this.jarFile = jarFile;
+}
+
+public String getClassName() {
+return className;
+}
+
+public File getJarFile() {
+return jarFile;
+}
+}
+
 static {
 Throwable err = null;
-HashMap pluginJars = new HashMap<>();
+Map pluginJars = new HashMap<>();
 try {
-pluginJars.put(ALWAYS_THROW_EXCEPTION, 
createPluginJar("always-throw-exception"));
-pluginJars.put(ALIASED_STATIC_FIELD, 
createPluginJar("aliased-static-field"));
-pluginJars.put(SAMPLING_CONVERTER, 
createPluginJar("sampling-converter"));
-pluginJars.put(SAMPLING_CONFIGURABLE, 
createPluginJar("sampling-configurable"));
-pluginJars.put(SAMPLING_HEADER_CONVERTER, 
createPluginJar("sampling-header-converter"));
-pluginJars.put(SAMPLING_CONFIG_PROVIDER, 
createPluginJar("sampling-config-provider"));
-pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader"));
+pluginJars.put("always-throw-exception",
+new PluginJar(ALWAYS_THROW_EXCEPTION, 
createPluginJar("always-throw-exception")));
+pluginJars.put("aliased-static-field",
+new PluginJar(ALIASED_STATIC_FIELD, 
createPluginJar("aliased-static-field")));
+pluginJars.put("sampling-converter",
+new PluginJar(SAMPLING_CONVERTER, 
createPluginJar("sampling-converter")));
+pluginJars.put("sampling-configurable",
+new PluginJar(SAMPLING_CONFIGURABLE, 
createPluginJar("sampling-configurable")));
+pluginJars.put("sampling-header-converter",
+new PluginJar(SAMPLING_HEADER_CONVERTER, 
createPluginJar("sampling-header-converter")));
+pluginJars.put("sampling-config-provider",
+new PluginJar(SAMPLING_CONFIG_PROVIDER, 
createPluginJar("sampling-config-provider")));
+pluginJars.put("service-loader",
+new PluginJar(SERVICE_LOADER, 
createPluginJar("service-loader")));
+// Create two versions of the same plugin reading version string 
from a resource
+pluginJars.put("read-version-from-resource-v1",

Review comment:
   Since these string literals are now relevant elsewhere, we should make 
them reusable constants. Perhaps they should be enums? I realize now that 
perhaps the class names should have also been enums but 🤷.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
##
@@ -143,16 +176,35 @@ public static void assertAvailable() throws 
AssertionError {
 public static List pluginPath() {

Review comment:
   The existing structure of this class bakes in the assumption that each 
plugin only appears once on the plugin path, and that the common use-case 
plugin path (returned by this method) is then valid. This change would make 
this method return an invalid plugin path, with flakey behavior when loading 
the duplicated plugin (which plugin gets loaded would be determined by 
iteration order over the PLUGIN_JARS hashmap).
   
   There are a couple of ways out:
   1. avoid tackling this now, and separate the two plugins with different 
class names
   2. have this method pick one of the duplicates to drop deterministically, so 
that the Plugins class doesn't have undefined loading behavior.
   3. allow/deny some of these plugins from being included in this default 
plugin path, and keep some plugins back for the more specific tests
   4. remove this method, and have PluginsTest explicitly include the needed 
plugins in each test, and/or a default list to include if none are specifically 
requested.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
##
@@ -98,22 +9

[jira] [Resolved] (KAFKA-6435) Application Reset Tool might delete incorrect internal topics

2021-04-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-6435.
---
Resolution: Fixed

> Application Reset Tool might delete incorrect internal topics
> -
>
> Key: KAFKA-6435
> URL: https://issues.apache.org/jira/browse/KAFKA-6435
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Joel Wee
>Priority: Major
>  Labels: bug, help-wanted, newbie
> Fix For: 3.0.0
>
>
> The streams application reset tool, deletes all topic that start with 
> {{-}}.
> If people have two versions of the same application and name them {{"app"}} 
> and {{"app-v2"}}, resetting {{"app"}} would also delete the internal topics 
> of {{"app-v2"}}.
> We either need to disallow the dash in the application ID, or improve the 
> topic identification logic in the reset tool to fix this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6435) Application Reset Tool might delete incorrect internal topics

2021-04-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-6435:
--
Fix Version/s: 3.0.0

> Application Reset Tool might delete incorrect internal topics
> -
>
> Key: KAFKA-6435
> URL: https://issues.apache.org/jira/browse/KAFKA-6435
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Joel Wee
>Priority: Major
>  Labels: bug, help-wanted, newbie
> Fix For: 3.0.0
>
>
> The streams application reset tool, deletes all topic that start with 
> {{-}}.
> If people have two versions of the same application and name them {{"app"}} 
> and {{"app-v2"}}, resetting {{"app"}} would also delete the internal topics 
> of {{"app-v2"}}.
> We either need to disallow the dash in the application ID, or improve the 
> topic identification logic in the reset tool to fix this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12721) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsAllTopicsAllGroups

2021-04-27 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12721:
--

 Summary: Flaky Test 
ResetConsumerGroupOffsetTest.testResetOffsetsAllTopicsAllGroups
 Key: KAFKA-12721
 URL: https://issues.apache.org/jira/browse/KAFKA-12721
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman


https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-8923/6/testReport/kafka.admin/ResetConsumerGroupOffsetTest/Build___JDK_8_and_Scala_2_12___testResetOffsetsAllTopicsAllGroups__/

Stacktrace
org.opentest4j.AssertionFailedError: Expected that consumer group has consumed 
all messages from topic/partition. Expected offset: 100. Actual offset: 0
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
at org.junit.jupiter.api.Assertions.fail(Assertions.java:117)
at 
kafka.admin.ResetConsumerGroupOffsetTest.awaitConsumerProgress(ResetConsumerGroupOffsetTest.scala:455)
at 
kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$5(ResetConsumerGroupOffsetTest.scala:140)
at 
kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$5$adapted(ResetConsumerGroupOffsetTest.scala:137)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at 
kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$4(ResetConsumerGroupOffsetTest.scala:137)
at 
kafka.admin.ResetConsumerGroupOffsetTest.$anonfun$testResetOffsetsAllTopicsAllGroups$4$adapted(ResetConsumerGroupOffsetTest.scala:136)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsAllTopicsAllGroups(ResetConsumerGroupOffsetTest.scala:136)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12666) Fix flaky kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic

2021-04-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12666.

Resolution: Duplicate

> Fix flaky 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic
> 
>
> Key: KAFKA-12666
> URL: https://issues.apache.org/jira/browse/KAFKA-12666
> Project: Kafka
>  Issue Type: Test
>Reporter: Bruno Cadonna
>Priority: Major
>
> Found two similar failures of this test on a PR that was unrelated:
> {code:java}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, 
> deadlineMs=1618341006330, tries=583, nextAllowedTryMs=1618341006437) timed 
> out at 1618341006337 after 583 attempt(s)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
>  {code}
>  
> {code:java}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: createTopics
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
>  {code}
>  
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10529/4/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed]
>  
> Might be related to KAFKA-12561.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest

2021-04-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334174#comment-17334174
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12629:


I'm seeing a lot of that exception as well, ie the 

{noformat}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
{noformat}

Four different RaftClusterTest failures with this same error on 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-8923/6/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed


> Flaky Test RaftClusterTest
> --
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12722) Evaluate moving replaceSegments into LogSegments class

2021-04-27 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-12722:


 Summary: Evaluate moving replaceSegments into LogSegments class
 Key: KAFKA-12722
 URL: https://issues.apache.org/jira/browse/KAFKA-12722
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kowshik Prakasam


The logic to replace segments is currently present as static logic in 
Log.scala. Since it is operating on top of `existingSegments`, we should see if 
we can move it into LogSegments class where it could be a better fit: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L2296.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-27 Thread GitBox


ableegoldman commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r621665639



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##
@@ -151,6 +151,22 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() {
 Assert.assertEquals(1, exitCode);
 }
 
+@Test
+public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
   Hah, maybe I should have asked why are all of those tests not also in 
`AbstractResetIntegrationTest`. Seems like almost everything that applies here 
would also be good to test in the SSL version of the test (which AFAICT is the 
only other one to extend the AbstractResetIntegrationTest).
   
   But I'm ok with leaving it as is, and maybe we can just look into this as 
followup work unless there is a good reason for them to be where they are 
(which I can't think of but my imagination is not endless)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r621603737



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2235,6 +2199,7 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 } else {
 offset = accumulator.append(epoch, records);
 }
+

Review comment:
   nit: the extra line is still there?

##
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##
@@ -354,6 +484,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() 
throws Exception {
 completedBatch.data.batches().forEach(recordBatch -> {
 assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); 
});
 });
+acc.close();

Review comment:
   Out of curiosity, why do we do this here but not elsewhere in this class?

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -194,14 +196,52 @@ private void completeCurrentBatch() {
 MemoryRecords data = currentBatch.build();
 completed.add(new CompletedBatch<>(
 currentBatch.baseOffset(),
-currentBatch.records(),
+Optional.of(currentBatch.records()),
 data,
 memoryPool,
 currentBatch.initialBuffer()
 ));
 currentBatch = null;
 }
 
+public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+appendLock.lock();
+try {
+forceDrain();
+ByteBuffer buffer = memoryPool.tryAllocate(256);
+if (buffer != null) {
+MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
+this.nextOffset, 

Review comment:
   nit: alignment looks a little off here. one extra indent?

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1859,15 +1819,17 @@ private void appendBatch(
 offsetAndEpoch.offset + 1, Integer.MAX_VALUE);
 
 future.whenComplete((commitTimeMs, exception) -> {
-int numRecords = batch.records.size();
-if (exception != null) {
-logger.debug("Failed to commit {} records at {}", 
numRecords, offsetAndEpoch, exception);
-} else {
-long elapsedTime = Math.max(0, commitTimeMs - 
appendTimeMs);
-double elapsedTimePerRecord = (double) elapsedTime / 
numRecords;
-kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, 
appendTimeMs);
-logger.debug("Completed commit of {} records at {}", 
numRecords, offsetAndEpoch);
-maybeFireHandleCommit(batch.baseOffset, epoch, 
batch.records);
+if (batch.records.isPresent()) {

Review comment:
   It would be nice if we didn't lose the exception for the leader change 
message. Perhaps one way we can do this is to add `numRecords` as a separate 
field in `CompletedBatch` so that we always have it available. Then we could 
change this to something like this:
   ```java
   future.whenComplete((commitTimeMs, exception) -> {
 if (exception != null) {
   logger.debug("Failed to commit {} records at {}", batch.numRecords, 
offsetAndEpoch, exception);
 } else {
   long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs);
   double elapsedTimePerRecord = (double) elapsedTime / batch.numRecords;
   kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs);
   logger.debug("Completed commit of {} records at {}", batch.numRecords, 
offsetAndEpoch);
   batch.records.ifPresent(records -> 
maybeFireHandleCommit(batch.baseOffset, epoch, records));
 }
   }
   ```

##
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##
@@ -65,6 +66,135 @@
 );
 }
 
+@Test
+public void testLeaderChangeMessageWritten() {
+int leaderEpoch = 17;
+long baseOffset = 0;
+int lingerMs = 50;
+int maxBatchSize = 512;
+
+ByteBuffer buffer = ByteBuffer.allocate(256);
+Mockito.when(memoryPool.tryAllocate(256))
+.thenReturn(buffer);
+
+BatchAccumulator acc = buildAccumulator(
+leaderEpoch,
+baseOffset,
+lingerMs,
+maxBatchSize
+);
+
+acc.appendLeaderChangeMessage(new LeaderChangeMessage(), 
time.milliseconds());
+
+List> batches = acc.drain();
+assertEquals(1, batches.size());
+
+BatchAccumulator.CompletedBatch batch = batches.get(0);
+batch.release();
+Mockito.verify(memoryPool).release(buffer);
+}
+
+@Test
+public void testForceDrain() {
+asList(APPEND, APPEND_ATOMIC).forEach(a

[GitHub] [kafka] ableegoldman commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-27 Thread GitBox


ableegoldman commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-827995691


   Oof, there are a LOT of flaky test failures in this build. They're all 
unrelated to this PR, mostly in Connect and the RaftClusterTest, so I'll go 
ahead and merge, but yikes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-27 Thread GitBox


junrao commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r621652737



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+/**
+ * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+ * changes if necessary.
+ *
+ * @param context   A human-readable context string used in log4j 
logging.
+ * @param brokerToRemoveNO_LEADER if no broker is being removed; the 
ID of the
+ *  broker to remove from the ISR and leadership, 
otherwise.
+ * @param brokerToAdd   NO_LEADER if no broker is being added; the ID 
of the
+ *  broker which is now eligible to be a leader, 
otherwise.
+ * @param records   A list of records which we will append to.
+ * @param iterator  The iterator containing the partitions to 
examine.
+ */
+void generateLeaderAndIsrUpdates(String context,
+ int brokerToRemove,
+ int brokerToAdd,
+ List records,
+ Iterator iterator) {
+int oldSize = records.size();
+Function isAcceptableLeader =
+r -> r == brokerToAdd || clusterControl.unfenced(r);
+while (iterator.hasNext()) {
+TopicIdPartition topicIdPart = iterator.next();
+TopicControlInfo topic = topics.get(topicIdPart.topicId());
+if (topic == null) {
+throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+
+" existed in isrMembers, but not in the topics map.");
+}
+PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+if (partition == null) {
+throw new RuntimeException("Partition " + topicIdPart +
+" existed in isrMembers, but not in the partitions map.");
+}
+int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
+int newLeader;
+if (isGoodLeader(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, 
isAcceptableLeader);

Review comment:
   The reason that we want to remove the shutting down replicas from ISR is 
to optimize for latency. If we don't do that, when the broker actually shuts 
down, it can block the producer for replica.max.ms before the replica can be 
taken out of ISR. So, I think this optimization is still useful.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -356,7 +367,9 @@ public void replay(PartitionChangeRecord record) {
 brokersToIsrs.update(record.topicId(), record.partitionId(),
 prevPartitionInfo.isr, newPartitionInfo.isr, 
prevPartitionInfo.leader,
 newPartitionInfo.leader);
-log.debug("Applied ISR change record: {}", record.toString());
+String topicPart = topicInfo.name + "-" + record.partitionId() + " 
with topic ID " +

Review comment:
   In the old controller, we bump up the leader epoch if the ISR is changed 
during the controlled shutdown. This helps prevent the shutting down broker 
from being added to ISR again. In the raft controller, we bump up the 
partitionEpoch when the ISR is changed. Do we plan to fence a fetch request 
with unmatched partitionEpoch to achieve the same logic? If so, do we have a 
jira to track that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-27 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r621502594



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+}
+
+List sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = sortedAllPartitions.size();
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List toBeRemovedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+if (ownedPartitions.size() > 0) {
+consumerAssignment.addAll(ownedPartitions);
+toBeRemovedPartitions.addAll(ownedPartitions);
+}
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {

Review comment:
   nit: don't use `++` inline like this, it makes the code harder to 
understand plus we'll end up incrementing it past the 
`numExpectedMaxCapacityMembers` so its value won't represent the actual number 
of members at max capacity in the end. This might be confusing, eg if we want 
to add logging that includes this value later on. Let's increment it in the 
body of this `else if`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassign

[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-27 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r621630725



##
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##
@@ -354,6 +484,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() 
throws Exception {
 completedBatch.data.batches().forEach(recordBatch -> {
 assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); 
});
 });
+acc.close();

Review comment:
   Hmm, not sure how I ended up adding that there




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12666) Fix flaky kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic

2021-04-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334205#comment-17334205
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12666:


There are quite a few tickets for the (very frequently failing) RaftClusterTest 
issues, it seems we have 
[KAFKA-12629|https://issues.apache.org/jira/browse/KAFKA-12629] as an umbrella 
ticket so can we close this one as a duplicate to keep everything in one place? 

> Fix flaky 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic
> 
>
> Key: KAFKA-12666
> URL: https://issues.apache.org/jira/browse/KAFKA-12666
> Project: Kafka
>  Issue Type: Test
>Reporter: Bruno Cadonna
>Priority: Major
>
> Found two similar failures of this test on a PR that was unrelated:
> {code:java}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, 
> deadlineMs=1618341006330, tries=583, nextAllowedTryMs=1618341006437) timed 
> out at 1618341006337 after 583 attempt(s)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
>  {code}
>  
> {code:java}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: createTopics
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
>  {code}
>  
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10529/4/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed]
>  
> Might be related to KAFKA-12561.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10497: KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim

2021-04-27 Thread GitBox


jsancio commented on a change in pull request #10497:
URL: https://github.com/apache/kafka/pull/10497#discussion_r621670833



##
File path: 
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##
@@ -108,18 +109,29 @@ class BrokerMetadataListenerTest {
 assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
   }
 
+  private def applyBatch(
+records: List[ApiMessageAndVersion]
+  ): Unit = {
+val baseOffset = lastMetadataOffset + 1

Review comment:
   Hmm. This is minor but does this mean that a `baseOffset` of 0 is not 
possible since `lastMetadataOffset` is initialized to `0`? Is this also true 
for a "regular" Kafka topic partition? Or is this just an side effect of how 
this test gets constructed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-27 Thread GitBox


cmccabe commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r621639045



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+/**
+ * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+ * changes if necessary.
+ *
+ * @param context   A human-readable context string used in log4j 
logging.
+ * @param brokerToRemoveNO_LEADER if no broker is being removed; the 
ID of the

Review comment:
   I think it's easier to use NO_LEADER since then I don't have to 
special-case the comparisons (I can just compare with NO_LEADER and it will 
fail when examining ISR members, for example).

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+/**
+ * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+ * changes if necessary.
+ *
+ * @param context   A human-readable context string used in log4j 
logging.
+ * @param brokerToRemoveNO_LEADER if no broker is being removed; the 
ID of the
+ *  broker to remove from the ISR and leadership, 
otherwise.
+ * @param brokerToAdd   NO_LEADER if no broker is being added; the ID 
of the
+ *  broker which is now eligible to be a leader, 
otherwise.
+ * @param records   A list of records which we will append to.
+ * @param iterator  The iterator containing the partitions to 
examine.
+ */
+void generateLeaderAndIsrUpdates(String context,
+ int brokerToRemove,
+ int brokerToAdd,
+ List records,
+ Iterator iterator) {
+int oldSize = records.size();
+Function isAcceptableLeader =
+r -> r == brokerToAdd || clusterControl.unfenced(r);
+while (iterator.hasNext()) {
+TopicIdPartition topicIdPart = iterator.next();
+TopicControlInfo topic = topics.get(topicIdPart.topicId());
+if (topic == null) {
+throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+
+" existed in isrMembers, but not in the topics map.");
+}
+PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+if (partition == null) {
+throw new RuntimeException("Partition " + topicIdPart +
+" existed in isrMembers, but not in the partitions map.");
+}
+int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
+int newLeader;
+if (isGoodLeader(newIsr, partition.leader)) {
+// If the current leader is good, don't change.
+newLeader = partition.leader;
+} else {
+// Choose a new leader.
+boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, 
isAcceptableLeader);

Review comment:
   One thing I've been considering for controlled shutdown is that perhaps 
we could just keep the shutting down replicas in the ISR but move the leaders.  
I think that would be a bit more graceful than what we have now, but we'd have 
to do some extra work to get there.
   
   Anyway, let's consider this later, as you suggested...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10497: KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim

2021-04-27 Thread GitBox


jsancio commented on a change in pull request #10497:
URL: https://github.com/apache/kafka/pull/10497#discussion_r621649450



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -126,10 +130,10 @@ class KafkaRaftManager[T](
   private val dataDir = createDataDir()
   private val metadataLog = buildMetadataLog()
   private val netChannel = buildNetworkChannel()
-  private val raftClient = buildRaftClient()
-  private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
+  val client: KafkaRaftClient[T] = buildRaftClient()
+  private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
 
-  def kafkaRaftClient: KafkaRaftClient[T] = raftClient
+  def kafkaRaftClient: KafkaRaftClient[T] = client

Review comment:
   I think that since you added a new method `client: RaftClient[T]` to 
`RaftManager[T]` and `KafkaRaftManager` overrides it to `client: 
KafkaRaftClient[T]` we should be able to remove this `KafkaRaftManager[T]` only 
public method. 

##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -126,10 +130,10 @@ class KafkaRaftManager[T](
   private val dataDir = createDataDir()
   private val metadataLog = buildMetadataLog()
   private val netChannel = buildNetworkChannel()
-  private val raftClient = buildRaftClient()
-  private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
+  val client: KafkaRaftClient[T] = buildRaftClient()

Review comment:
   Did you mean to override the return type from `RaftClient[T]` to 
`KafkaRaftClient[T]`?

##
File path: 
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##
@@ -108,18 +109,29 @@ class BrokerMetadataListenerTest {
 assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
   }
 
+  private def applyBatch(
+records: List[ApiMessageAndVersion]
+  ): Unit = {
+val baseOffset = lastMetadataOffset + 1

Review comment:
   Hmm. This is minor but those this mean that a `baseOffset` of 0 is not 
possible since `lastMetadataOffset` is initialized to `0`? Is this also true 
for a "regular" Kafka topic partition? Or is this just an side effect of how 
this test gets constructed.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader reader) {
 listener.handleCommit(reader);
 }
 
-void maybeFireHandleClaim(int epoch, long epochStartOffset) {
-// We can fire `handleClaim` as soon as the listener has caught
-// up to the start of the leader epoch. This guarantees that the
-// state machine has seen the full committed state before it 
becomes
-// leader and begins writing to the log.
-if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) {
-claimedEpoch = epoch;
-listener.handleClaim(epoch);
+void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+if (shouldFireLeaderChange(leaderAndEpoch)) {
+lastFiredLeaderChange = leaderAndEpoch;
+listener.handleLeaderChange(leaderAndEpoch);
 }
 }
 
-void fireHandleResign(int epoch) {
-listener.handleResign(epoch);
+private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+if (leaderAndEpoch.equals(lastFiredLeaderChange)) {
+return false;
+} else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) {
+return true;

Review comment:
   I see. We want to fire this event even if the `leader` is 
`Optional.empty()` because we use this event to propagate lost of leadership.

##
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -331,22 +342,40 @@ public void beginShutdown() {
 }
 
 @Override
-public void close() throws InterruptedException {
+public void close() {
 log.debug("Node {}: closing.", nodeId);
 beginShutdown();
-eventQueue.close();
+
+try {
+eventQueue.close();
+} catch (InterruptedException e) {
+Thread.currentThread().interrupt();

Review comment:
   Can you explain why we are doing this?

##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
##
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
 this.epoch = epoch;
 }
 
+public boolean isLeader(int nodeId) {
+return leaderId.isPresent() && leaderId.getAsInt() == nodeId;

Review comment:
   Minor but how about `return leaderId.equals(OptionalInt.of(nodeId));`

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader reader) {
 listener.handleCommit(

[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-04-27 Thread GitBox


ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r621676170



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##
@@ -77,21 +87,39 @@ public void configure(Map configs, boolean 
isKey) {
 }
 }
 
+private void serializeNullIndexList(final DataOutputStream out, 
List data) throws IOException {
+List nullIndexList = IntStream.range(0, data.size())
+.filter(i -> data.get(i) == null)
+.boxed().collect(Collectors.toList());
+out.writeInt(nullIndexList.size());
+for (int i : nullIndexList) out.writeInt(i);
+}
+
 @Override
 public byte[] serialize(String topic, List data) {
 if (data == null) {
 return null;
 }
-final int size = data.size();
 try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
  final DataOutputStream out = new DataOutputStream(baos)) {
+out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+serializeNullIndexList(out, data);
+}
+final int size = data.size();
 out.writeInt(size);
 for (Inner entry : data) {
-final byte[] bytes = inner.serialize(topic, entry);
-if (!isFixedLength) {
-out.writeInt(bytes.length);
+if (entry == null) {
+if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+}
+} else {
+final byte[] bytes = inner.serialize(topic, entry);
+if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+out.writeInt(bytes.length);

Review comment:
   Just to clarify you mean don't expose this to the user at all, right? 
That sounds completely fine to me. If there are enough people trying to 
serialize lists of custom classes with all constant data size who want this 
optimization exposed for general use, then someone will request the feature and 
we can go back and add it in. Then we can debate what the API should look like 
at that time, and keep things simple for now. Personally I suspect the vast 
majority of non-primitive data types are not going to be constant size anyways.
   
   Given the above, I think whether to track the strategy as an actual 
`SerializationStrategy` enum vs a boolean flag becomes a matter of code style 
and personal preference, since it's no longer exposed to the user. So it's up 
to you whether you find the enum or the flag to be more readable or clean




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r621734730



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##
@@ -984,19 +1003,26 @@ class LogCleanerTest {
 
 def distinctValuesBySegment = log.logSegments.map(s => 
s.log.records.asScala.map(record => 
TestUtils.readString(record.value)).toSet.size).toSeq
 
-val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
 assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
   "Test is not effective unless each segment contains duplicates. Increase 
segment size or decrease number of keys.")
 
+log.updateHighWatermark(log.activeSegment.baseOffset)
 cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
firstUncleanableOffset))
 
 val distinctValuesBySegmentAfterClean = distinctValuesBySegment
 
-
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
-  .take(numCleanableSegments).forall { case (before, after) => after < 
before },
+// One segment should have been completely deleted, so there will be fewer 
segments.
+assertTrue(distinctValuesBySegmentAfterClean.size < 
distinctValuesBySegmentBeforeClean.size)
+
+// Drop the first segment from before cleaning since it was removed. Also 
subtract 1 from numCleanableSegments
+val normalizedDistinctValuesBySegmentBeforeClean = 
distinctValuesBySegmentBeforeClean.drop(1)

Review comment:
   The logic in this test case has become rather obscure after the change. 
Maybe we could do something simpler than comparing segment by segment. As far 
as I can tell, all the test is doing is ensuring that the first uncleanable 
offset is respected. Maybe a simpler test would just write the same key over 
and over and then assert that all records below the uncleanable offset are 
removed and all records above that offset are retained?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-27 Thread GitBox


ableegoldman merged pull request #8923:
URL: https://github.com/apache/kafka/pull/8923


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-27 Thread GitBox


hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r621668165



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -340,17 +343,18 @@ private FilterResult(ByteBuffer outputBuffer) {
 private void updateRetainedBatchMetadata(MutableRecordBatch 
retainedBatch, int numMessagesInBatch, boolean headerOnly) {
 int bytesRetained = headerOnly ? 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes();
 updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), 
retainedBatch.lastOffset(),
-retainedBatch.lastOffset(), numMessagesInBatch, 
bytesRetained);
+retainedBatch.lastOffset(), retainedBatch.baseOffset(), 
numMessagesInBatch, bytesRetained);
 }
 
 private void updateRetainedBatchMetadata(long maxTimestamp, long 
shallowOffsetOfMaxTimestamp, long maxOffset,
-int messagesRetained, int 
bytesRetained) {
+long baseOffset, int 
messagesRetained, int bytesRetained) {

Review comment:
   nit: this looks misaligned

##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -700,11 +716,18 @@ private[log] class Cleaner(val id: Int,
   // if any messages are to be retained, write them out
   val outputBuffer = result.outputBuffer
   if (outputBuffer.position() > 0) {
+if (destSegment.isEmpty) {
+  // create a new segment with a suffix appended to the name of the 
log and indexes
+  destSegment = Some(LogCleaner.createNewCleanedSegment(log.dir, 
log.config, result.baseOffsetOfFirstBatch()))
+  transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)
+  transactionMetadata.appendTransactionIndex()
+}
+
 outputBuffer.flip()
 val retained = MemoryRecords.readableRecords(outputBuffer)
 // it's OK not to hold the Log's lock in this case, because this 
segment is only accessed by other threads
 // after `Log.replaceSegments` (which acquires the lock) is called
-dest.append(largestOffset = result.maxOffset,
+destSegment.get.append(largestOffset = result.maxOffset,

Review comment:
   nit: we could probably do something like this to avoid the nasty `get` 
calls in here
   ```scala
   val segment = destSegment.getOrElse {
 val newSegment = LogCleaner.createNewCleanedSegment(log.dir, log.config, 
result.baseOffsetOfFirstBatch())
 transactionMetadata.cleanedIndex = Some(newSegment.txnIndex)
 transactionMetadata.appendTransactionIndex()
 destSegment = Some(newSegment)
 newSegment
   }
   ```

##
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##
@@ -984,19 +1003,26 @@ class LogCleanerTest {
 
 def distinctValuesBySegment = log.logSegments.map(s => 
s.log.records.asScala.map(record => 
TestUtils.readString(record.value)).toSet.size).toSeq
 
-val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
 assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
   "Test is not effective unless each segment contains duplicates. Increase 
segment size or decrease number of keys.")
 
+log.updateHighWatermark(log.activeSegment.baseOffset)
 cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
firstUncleanableOffset))
 
 val distinctValuesBySegmentAfterClean = distinctValuesBySegment
 
-
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
-  .take(numCleanableSegments).forall { case (before, after) => after < 
before },
+// One segment should have been completely deleted, so there will be fewer 
segments.
+assertTrue(distinctValuesBySegmentAfterClean.size < 
distinctValuesBySegmentBeforeClean.size)
+
+// Drop the first segment from before cleaning since it was removed. Also 
subtract 1 from numCleanableSegments
+val normalizedDistinctValuesBySegmentBeforeClean = 
distinctValuesBySegmentBeforeClean.drop(1)

Review comment:
   The logic in this test case has become rather obscure after the change. 
Maybe we could do something simpler than comparing segment by segment. As far 
as I can tell, all the test is doing is ensuring that the first uncleanable 
offset is respected. Maybe a simpler test would just write the same key over 
and over and then assert that all records below the uncleanable offset are 
removed and all values above that offset are retained?

##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -1161,6 +1191,14 @@ private[log] class CleanedTransactionMetadata {
 }
   }
 
+  /**
+   * Apply transactions that accumulated before cleanedIndex was applied
+   */
+  def appendTransactionIndex(): Uni

[GitHub] [kafka] showuon commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable

2021-04-27 Thread GitBox


showuon commented on a change in pull request #10547:
URL: https://github.com/apache/kafka/pull/10547#discussion_r621757453



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -337,18 +337,28 @@ public void createTopic(String topic) {
  * @param topic The name of the topic.
  */
 public void createTopic(String topic, int partitions) {
-createTopic(topic, partitions, 1, new HashMap<>());
+createTopic(topic, partitions, 1, new HashMap<>(), new Properties());

Review comment:
   Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable

2021-04-27 Thread GitBox


chia7712 commented on a change in pull request #10547:
URL: https://github.com/apache/kafka/pull/10547#discussion_r621784059



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -337,18 +337,28 @@ public void createTopic(String topic) {
  * @param topic The name of the topic.
  */
 public void createTopic(String topic, int partitions) {
-createTopic(topic, partitions, 1, new HashMap<>());
+createTopic(topic, partitions, 1, Collections.emptyMap(), new 
Properties());

Review comment:
   It seems we don't need to pass `new Properties()`, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10547: KAFKA-12284: increase request timeout to make tests reliable

2021-04-27 Thread GitBox


showuon commented on a change in pull request #10547:
URL: https://github.com/apache/kafka/pull/10547#discussion_r621788834



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -337,18 +337,28 @@ public void createTopic(String topic) {
  * @param topic The name of the topic.
  */
 public void createTopic(String topic, int partitions) {
-createTopic(topic, partitions, 1, new HashMap<>());
+createTopic(topic, partitions, 1, Collections.emptyMap(), new 
Properties());

Review comment:
   yes, you're right! Sorry, I didn't catch your idea. Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries

2021-04-27 Thread GitBox


vitojeng commented on a change in pull request #10597:
URL: https://github.com/apache/kafka/pull/10597#discussion_r621794196



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -344,7 +345,7 @@ private boolean isRunningOrRebalancing() {
 
 private void validateIsRunningOrRebalancing() {
 if (!isRunningOrRebalancing()) {
-throw new IllegalStateException("KafkaStreams is not running. 
State is " + state + ".");
+throw new StreamsNotStartedException("KafkaStreams is not running. 
State is " + state + ".");

Review comment:
   > So from a user perspective you would want to catch and retry maybe on 
StreamsNotStartedException, but IllegalStateException maybe even should in fact 
kill the app so it can be restarted (eg k8s restarts the process/pod).
   
   Totally agree, sorry that I didn't point out this before. The user can just 
catch and retry when StreamsNotStartedException thrown. This is different from 
IllegalStateException. I don't think we need introduce another exception.
   
   So I'm +1 on (c).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10547: KAFKA-12284: increase request timeout to make tests reliable

2021-04-27 Thread GitBox


showuon commented on pull request #10547:
URL: https://github.com/apache/kafka/pull/10547#issuecomment-828180085


   Failed tests are all un-related. (Not in MirrorMakerIntegrationTest)
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryOnlyActivePartitionStoresByDefault
   Build / JDK 15 and Scala 2.13 / 
kafka.api.TransactionsTest.testCommitTransactionTimeout()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10598: MINOR: rename wrong topic id variable name and description

2021-04-27 Thread GitBox


dengziming commented on pull request #10598:
URL: https://github.com/apache/kafka/pull/10598#issuecomment-828185212


   > @dengziming nice improvement. LGTM
   > 
   > BTW, line#203 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L203)
 has weird description ( `offset id` ). Could you update it also?
   
   Thank you for your reminder, Done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org