[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622780715



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,181 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+if (ownedPartitions.size() > 0) {
+consumerAssignment.addAll(ownedPartitions);
+assignedPartitions.addAll(ownedPartitions);
+}
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+// consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+// so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+numMaxCapacityMembers++;
+List maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+consumerAssignment.addAll(maxQuotaPartitions);
+assignedPartitions.addAll(maxQuotaPartitions);
+allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
 } else {
-// It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-if (consumerAssignment.size() == minQuota)
-minCapacityMembers.add(consumer);
-if (consumerAssi

[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622779994



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+}
+
+List sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = sortedAllPartitions.size();
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List toBeRemovedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+if (ownedPartitions.size() > 0) {
+consumerAssignment.addAll(ownedPartitions);
+toBeRemovedPartitions.addAll(ownedPartitions);
+}
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+// consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+// so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));
+allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
 } else {
-// It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-if (consumerAssignment.size() == minQuota)
-minCapacityMembers.add(consumer);
-if (consumerAssignment.size() == m

[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-28 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r622767920



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTransform {

Review comment:
   `RemoteLogSegmentMetadata` is a public API and we do not want to add any 
specific serialization implementation details into that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-28 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r622767150



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.metadata.AbstractApiMessageSerde;
+import java.nio.ByteBuffer;
+
+/**
+ * This class class provides conversion of {@code ApiMessageAndVersion} to 
bytes and vice versa.. This can be used as serialization protocol for any
+ * metadata records derived of {@code ApiMessage}s. It internally uses {@link 
AbstractApiMessageSerde} for serialization/deserialization
+ * mechanism.
+ * 
+ * Implementors need to extend this class and implement {@link 
#apiMessageFor(short)} method to return a respective
+ * {@code ApiMessage} for the given {@code apiKey}. This is required to 
deserialize the bytes to build the respective
+ * {@code ApiMessage} instance.
+ */
+public abstract class BytesApiMessageSerde {

Review comment:
   `BytesApiMessageSerde ` is a generic implementation that can be used by 
others. As I said in my earlier 
[comment](https://github.com/apache/kafka/pull/10271#issuecomment-826991751) it 
will be moved to `clients` module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-28 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r622766875



##
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
##
@@ -270,9 +247,9 @@ public String toString() {
"remoteLogSegmentId=" + remoteLogSegmentId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
-   ", brokerId=" + brokerId +
+   ", brokerId=" + brokerId() +

Review comment:
   afaik, javac(JDK8+) already converts that into `StringBuilder`.and  no 
need to put `StringBuilder` concatenation code here. 
   Actually, we should avoid doing that as javac has better optimizations like 
[JEP-280](https://openjdk.java.net/jeps/280).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-28 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r622765854



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadata}. This is the root serde
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataSerde {
+private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
+private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private final Map remoteLogStorageClassToApiKey;
+private final Map keyToTransform;
+private final BytesApiMessageSerde bytesApiMessageSerde;
+
+public RemoteLogMetadataSerde() {
+remoteLogStorageClassToApiKey = 
createRemoteLogStorageClassToApiKeyMap();
+keyToTransform = createRemoteLogMetadataTransforms();
+bytesApiMessageSerde = new BytesApiMessageSerde() {
+@Override
+public ApiMessage apiMessageFor(short apiKey) {
+return newApiMessage(apiKey);
+}
+};
+}
+
+protected ApiMessage newApiMessage(short apiKey) {
+return MetadataRecordType.fromId(apiKey).newMetadataRecord();
+}
+
+protected Map 
createRemoteLogMetadataTransforms() {
+Map map = new HashMap<>();
+map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataTransform());
+map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateTransform());
+map.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataTransform());
+return map;
+}
+
+protected Map createRemoteLogStorageClassToApiKeyMap() {
+Map map = new HashMap<>();
+map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
+map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
+map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
+return map;
+}
+
+public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
+Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());

Review comment:
   `apiKey` is an internal serialization detail of one of the 
implementations. We should not leak that to public API like `RemoteLogMetadata` 
and its subclasses.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on pull request #10562: MINOR: Update tests to include the 2.8.0 release

2021-04-28 Thread GitBox


kamalcph commented on pull request #10562:
URL: https://github.com/apache/kafka/pull/10562#issuecomment-828958926


   I didn't notice this PR. Opened #10602 for this. BTW, why the Kafka distro 
with scala v2.12 is chosen? The recommended version on the doc page is v2.13.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on a change in pull request #10605: KAFKA-12726 prevent a stuck Task.stop() from blocking subsequent Task.stops()s

2021-04-28 Thread GitBox


kamalcph commented on a change in pull request #10605:
URL: https://github.com/apache/kafka/pull/10605#discussion_r622746344



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
 ClassLoader savedLoader = plugins.currentThreadLoader();
 try {
 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-task.stop();
+CountDownLatch latch = new CountDownLatch(1);
+new Thread() {
+@Override
+public void run() {
+task.stop();
+latch.countDown();
+}
+}.start();
+// Wait for thread to terminate, but not longer than timeout.
+if (timeout <= 0) {

Review comment:
   nit: This if statement is not required. The countdown latch won't wait 
at all if the timeout is <= 0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on a change in pull request #10589: MINOR: move topic configuration defaults

2021-04-28 Thread GitBox


kamalcph commented on a change in pull request #10589:
URL: https://github.com/apache/kafka/pull/10589#discussion_r622745172



##
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##
@@ -56,26 +66,30 @@
 "be overridden on a per-topic basis (see the 
per-topic configuration section).";
 
 public static final String FLUSH_MS_CONFIG = "flush.ms";
+public static final long FLUSH_MS_DEFAULT = Long.MAX_VALUE;
 public static final String FLUSH_MS_DOC = "This setting allows specifying 
a time interval at which we will " +
 "force an fsync of data written to the log. For example if this was 
set to 1000 " +
 "we would fsync after 1000 ms had passed. In general we recommend you 
not set " +
 "this and use replication for durability and allow the operating 
system's background " +
 "flush capabilities as it is more efficient.";
 
 public static final String RETENTION_BYTES_CONFIG = "retention.bytes";
+public static final long RETENTION_BYTES_DEFAULT = -1L;
 public static final String RETENTION_BYTES_DOC = "This configuration 
controls the maximum size a partition " +
 "(which consists of log segments) can grow to before we will discard 
old log segments to free up space if we " +
 "are using the \"delete\" retention policy. By default there is no 
size limit only a time limit. " +
 "Since this limit is enforced at the partition level, multiply it by 
the number of partitions to compute " +
 "the topic retention in bytes.";
 
 public static final String RETENTION_MS_CONFIG = "retention.ms";
+public static final long RETENTION_MS_DEFAULT = TimeUnit.DAYS.toMillis(7);
 public static final String RETENTION_MS_DOC = "This configuration controls 
the maximum time we will retain a " +
 "log before we will discard old log segments to free up space if we 
are using the " +
 "\"delete\" retention policy. This represents an SLA on how soon 
consumers must read " +
 "their data. If set to -1, no time limit is applied.";
 
 public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
+public static final int MAX_MESSAGE_BYTES_DEFAULT = 1024 * 1024 + 12;

Review comment:
   Why not referring to Records.LOG_OVERHEAD instead of 12?

##
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##
@@ -166,19 +191,36 @@
 "they will receive messages with a format that they don't understand.";
 
 public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = 
"message.timestamp.type";
+public static final String MESSAGE_TIMESTAMP_TYPE_DEFAULT = "CreateTime";
 public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether 
the timestamp in the message is " +
 "message create time or log append time. The value should be either 
`CreateTime` or `LogAppendTime`";
 
 public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = 
"message.timestamp.difference.max.ms";
+public static final long MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT = 
Long.MAX_VALUE;
 public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "The 
maximum difference allowed between " +
 "the timestamp when a broker receives a message and the timestamp 
specified in the message. If " +
 "message.timestamp.type=CreateTime, a message will be rejected if the 
difference in timestamp " +
 "exceeds this threshold. This configuration is ignored if 
message.timestamp.type=LogAppendTime.";
 
 public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = 
"message.downconversion.enable";
+public static final boolean MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT = true;
 public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This 
configuration controls whether " +
 "down-conversion of message formats is enabled to satisfy consume 
requests. When set to false, " +
 "broker will not perform down-conversion for consumers expecting an 
older message format. The broker responds " +
 "with UNSUPPORTED_VERSION error for consume requests from 
such older clients. This configuration" +
 "does not apply to any message format conversion that might be 
required for replication to followers.";
+
+public static final String LEADER_REPLICATION_THROTTLED_REPLICAS = 
"leader.replication.throttled.replicas";
+public static final Collection 
LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT = Collections.emptyList();
+public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_DOC = "A 
list of replicas for which log " +
+"replication should be throttled on the leader side. The list should 
describe a set of replicas in the form " +
+"[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or 
alternatively the wildcard '*' can be used to throttle " +
+"all replicas for this topic.";
+
+ 

[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-04-28 Thread GitBox


kamalcph commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r622727068



##
File path: tests/docker/Dockerfile
##
@@ -62,6 +62,7 @@ RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw 
/opt/kafka-2.4.1 && curl -s "$KAFK
 RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.5.1"
 RUN mkdir -p "/opt/kafka-2.6.2" && chmod a+rw /opt/kafka-2.6.2 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-2.6.2.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.6.2"
 RUN mkdir -p "/opt/kafka-2.7.0" && chmod a+rw /opt/kafka-2.7.0 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-2.7.0.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.7.0"
+RUN mkdir -p "/opt/kafka-2.8.0" && chmod a+rw /opt/kafka-2.8.0 && curl -s 
"$KAFKA_MIRROR/kafka_2.13-2.8.0.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.8.0"

Review comment:
   Distribution with scala v2.13 is recommended in the documentation page 
so it's chosen.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-04-28 Thread GitBox


kamalcph commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r622727068



##
File path: tests/docker/Dockerfile
##
@@ -62,6 +62,7 @@ RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw 
/opt/kafka-2.4.1 && curl -s "$KAFK
 RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.5.1"
 RUN mkdir -p "/opt/kafka-2.6.2" && chmod a+rw /opt/kafka-2.6.2 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-2.6.2.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.6.2"
 RUN mkdir -p "/opt/kafka-2.7.0" && chmod a+rw /opt/kafka-2.7.0 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-2.7.0.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.7.0"
+RUN mkdir -p "/opt/kafka-2.8.0" && chmod a+rw /opt/kafka-2.8.0 && curl -s 
"$KAFKA_MIRROR/kafka_2.13-2.8.0.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.8.0"

Review comment:
   Distribution with scala version v2.13 is recommended in the 
documentation so it's chosen.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10535: MINOR: Remove duplicate method in test classes

2021-04-28 Thread GitBox


chia7712 commented on pull request #10535:
URL: https://github.com/apache/kafka/pull/10535#issuecomment-828933051


   @dengziming Could you merge trunk to trigger QA again? There are some flaky 
fixes which can stabilize the QA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-04-28 Thread GitBox


kamalcph commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r622726584



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+@SuppressWarnings("unchecked")
+public static void main(final String[] args) throws Exception {
+if (args.length < 1) {
+System.err.println("StreamsUpgradeTest requires one argument 
(properties-file) but provided none");
+}
+final String propFileName = args[0];
+
+final Properties streamsProperties = Utils.loadProps(propFileName);
+
+System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v2.7)");
+System.out.println("props=" + streamsProperties);
+
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream dataStream = builder.stream("data");
+dataStream.process(printProcessorSupplier());
+dataStream.to("echo");
+
+final Properties config = new Properties();
+config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
+config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+config.putAll(streamsProperties);
+
+final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+streams.start();
+
+Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+streams.close();
+System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+System.out.flush();
+}));
+}
+
+private static  ProcessorSupplier printProcessorSupplier() {
+return () -> new AbstractProcessor() {
+private int numRecordsProcessed = 0;
+
+@Override
+public void init(final ProcessorContext context) {
+System.out.println("[2.6] initializing processor: topic=data 
taskId=" + context.taskId());

Review comment:
   Updated the version to 2.8.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-04-28 Thread GitBox


kamalcph commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r622726486



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+@SuppressWarnings("unchecked")
+public static void main(final String[] args) throws Exception {
+if (args.length < 1) {
+System.err.println("StreamsUpgradeTest requires one argument 
(properties-file) but provided none");
+}
+final String propFileName = args[0];
+
+final Properties streamsProperties = Utils.loadProps(propFileName);
+
+System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v2.7)");

Review comment:
   Ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory

2021-04-28 Thread GitBox


ableegoldman commented on pull request #10608:
URL: https://github.com/apache/kafka/pull/10608#issuecomment-828927542


   call for review any of @wcarlson5 @lct45 @cadonna @guozhangwang @mjsax 
@vvcephei @rodesai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory

2021-04-28 Thread GitBox


ableegoldman commented on a change in pull request #10608:
URL: https://github.com/apache/kafka/pull/10608#discussion_r622719786



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -431,7 +397,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
 if (now > lastModifiedMs + cleanupDelayMs) {
 log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
 logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
-Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));

Review comment:
   This change seems innocuous but has a pretty sweet side effect: because 
we had to exclude the lock file during the deletion (due to some stupid Windows 
bug 🙄 ), this utility method was not actually deleting the task directory 
itself. AFAICT this means we were never deleting these empty directories at any 
point, unless the user manually called KafkaStreams#cleanUp. It's not a 
horrible bug, but it's just obnoxious so I'm glad we can finally be rid of the 
Windows ball & chain that was its `DirectoryNotEmptyException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory

2021-04-28 Thread GitBox


ableegoldman commented on a change in pull request #10608:
URL: https://github.com/apache/kafka/pull/10608#discussion_r622718411



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##
@@ -315,7 +314,7 @@ public void 
shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusClean
 }
 
 @Test
-public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() {
+public void 
shouldCleanupObsoleteTaskDirectoriesAndDeleteTheDirectoryItself() {

Review comment:
   This test was originally added when we [fixed a bug 
](https://github.com/apache/kafka/pull/9373/files#diff-beec0c6a88443677858de59258cf144238b4cbb352fe02a7375c0bbba929696aR311)where
 Streams was continuously attempting to clean "empty" task directories and 
logging the `"Deleting obsolete state directory..."` message. The root cause of 
this was that we couldn't delete the task directory itself due to the lock 
file, so we just left empty directories behind. This PR fixes that, so I 
adapted the PR to test this new functionality since the old test wouldn't 
really make sense any more




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory

2021-04-28 Thread GitBox


ableegoldman commented on a change in pull request #10608:
URL: https://github.com/apache/kafka/pull/10608#discussion_r622718411



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##
@@ -315,7 +314,7 @@ public void 
shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusClean
 }
 
 @Test
-public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() {
+public void 
shouldCleanupObsoleteTaskDirectoriesAndDeleteTheDirectoryItself() {

Review comment:
   This test was originally added when we fixed a bug where Streams was 
continuously attempting to clean "empty" task directories and logging the 
`"Deleting obsolete state directory..."` message. The root cause of this was 
that we couldn't delete the task directory itself due to the lock file, so we 
just left empty directories behind. This PR fixes that, so I adapted the PR to 
test this new functionality since the old test wouldn't really make sense any 
more




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-6655) CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows OS)

2021-04-28 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-6655.
---
Resolution: Fixed

This should be fixed, a number of improvements were made around the locking a 
few versions ago but we actually took it a step further and removing this kind 
of file-based locking for task directories entirely as of 3.0

> CleanupThread: Failed to lock the state directory due to an unexpected 
> exception (Windows OS)
> -
>
> Key: KAFKA-6655
> URL: https://issues.apache.org/jira/browse/KAFKA-6655
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: Windows Operating System
>Reporter: Srini
>Priority: Major
>
> This issue happens on Windows OS. It code works fine on Linux. This ticket is 
> related to KAFKA-6647, that also reports locking issues on Windows. However, 
> there, the issue occurs if users calls KafkaStreams#cleanUp() explicitly, 
> while this ticket is related to KafkaStreams background CleanupThread (note, 
> that both use `StateDirectory.cleanRemovedTasks`, but behavior is still 
> slightly different as different parameters are passed into the method).
> {quote}[CleanupThread] Failed to lock the state directory due to an 
> unexpected exceptionjava.nio.file.DirectoryNotEmptyException: 
> \tmp\kafka-streams\srini-20171208\0_9
>  at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
>  at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>  at java.nio.file.Files.delete(Files.java:1126)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619)
>  at java.nio.file.Files.walkFileTree(Files.java:2688)
>  at java.nio.file.Files.walkFileTree(Files.java:2742)
>  at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>  at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245)
>  at org.apache.kafka.streams.KafkaStreams$3.run(KafkaStreams.java:761)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10598: MINOR: rename wrong topic id variable name and description

2021-04-28 Thread GitBox


chia7712 commented on pull request #10598:
URL: https://github.com/apache/kafka/pull/10598#issuecomment-828919194


   @dengziming thanks for your contribution. @showuon thanks for updating 
profile.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10598: MINOR: rename wrong topic id variable name and description

2021-04-28 Thread GitBox


chia7712 merged pull request #10598:
URL: https://github.com/apache/kafka/pull/10598


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory

2021-04-28 Thread GitBox


ableegoldman opened a new pull request #10608:
URL: https://github.com/apache/kafka/pull/10608


   Minor followup to https://github.com/apache/kafka/pull/10342 that I noticed 
while working on the NamedTopology stuff. Cleans up a few things:
   
   1. We no longer need locking for the global state directory either, since 
it's contained within the top-level state directory lock
   2. Clears out misc. usages of the LOCK_FILE_NAME that no longer apply
   3. Lazily delete old-and-now-unused lock files in the 
`StateDirectory#taskDirIsEmpty` method to clean up the state directory for 
applications that upgraded from an older version that still used task locking


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Description: 
kafka version. 2.12-2.1.1

after zk flash disconnected 

then broker6 become the controller leader

but the other brokers can't connect to the broker6 until we restart the broker6 
then the cluster become recover.

 

bandwidth is normal

!image-2021-04-29-11-33-24-303.png!

!image-2021-04-29-11-33-53-048.png!

!image-2021-04-29-11-34-18-654.png!

!image-2021-04-29-11-35-26-994.png!

!image-2021-04-29-11-35-41-393.png!

!image-2021-04-29-11-35-52-191.png!

 

 

 
{code:java}
//代码占位符
Connection to 6 was disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
 

  was:
kafka version. 2.12-2.1.1

after zk flash disconnected 

then broker6 become the controller leader

but the other broker can't connect to the broker6 until we restart the broker6 
then the cluster become recover.

 

bandwidth is normal

!image-2021-04-29-11-33-24-303.png!

!image-2021-04-29-11-33-53-048.png!

!image-2021-04-29-11-34-18-654.png!

!image-2021-04-29-11-35-26-994.png!

!image-2021-04-29-11-35-41-393.png!

!image-2021-04-29-11-35-52-191.png!

 

 

 
{code:java}
//代码占位符
Connection to 6 was disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
 


> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, 
> image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, 
> image-2021-04-29-11-35-52-191.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other brokers can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-33-24-303.png!
> !image-2021-04-29-11-33-53-048.png!
> !image-2021-04-29-11-34-18-654.png!
> !image-2021-04-29-11-35-26-994.png!
> !image-2021-04-29-11-35-41-393.png!
> !image-2021-04-29-11-35-52-191.png!
>  
>  
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherTh

[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Attachment: image-2021-04-29-11-35-26-994.png

> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, 
> image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, 
> image-2021-04-29-11-35-52-191.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other broker can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-24-22-704.png!  
> !image-2021-04-29-11-27-12-924.png!
> !image-2021-04-29-11-27-35-679.png!
> !image-2021-04-29-11-25-41-208.png!
> !image-2021-04-29-11-26-34-894.png!
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Attachment: image-2021-04-29-11-35-41-393.png

> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, 
> image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, 
> image-2021-04-29-11-35-52-191.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other broker can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-24-22-704.png!  
> !image-2021-04-29-11-27-12-924.png!
> !image-2021-04-29-11-27-35-679.png!
> !image-2021-04-29-11-25-41-208.png!
> !image-2021-04-29-11-26-34-894.png!
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Attachment: image-2021-04-29-11-35-52-191.png

> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, 
> image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, 
> image-2021-04-29-11-35-52-191.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other broker can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-33-24-303.png!
> !image-2021-04-29-11-33-53-048.png!
> !image-2021-04-29-11-34-18-654.png!
> !image-2021-04-29-11-35-26-994.png!
> !image-2021-04-29-11-35-41-393.png!
> !image-2021-04-29-11-35-52-191.png!
>  
>  
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Description: 
kafka version. 2.12-2.1.1

after zk flash disconnected 

then broker6 become the controller leader

but the other broker can't connect to the broker6 until we restart the broker6 
then the cluster become recover.

 

bandwidth is normal

!image-2021-04-29-11-33-24-303.png!

!image-2021-04-29-11-33-53-048.png!

!image-2021-04-29-11-34-18-654.png!

!image-2021-04-29-11-35-26-994.png!

!image-2021-04-29-11-35-41-393.png!

!image-2021-04-29-11-35-52-191.png!

 

 

 
{code:java}
//代码占位符
Connection to 6 was disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
 

  was:
kafka version. 2.12-2.1.1

after zk flash disconnected 

then broker6 become the controller leader

but the other broker can't connect to the broker6 until we restart the broker6 
then the cluster become recover.

 

bandwidth is normal

!image-2021-04-29-11-24-22-704.png!  

!image-2021-04-29-11-27-12-924.png!

!image-2021-04-29-11-27-35-679.png!

!image-2021-04-29-11-25-41-208.png!

!image-2021-04-29-11-26-34-894.png!

 
{code:java}
//代码占位符
Connection to 6 was disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
 


> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, 
> image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, 
> image-2021-04-29-11-35-52-191.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other broker can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-33-24-303.png!
> !image-2021-04-29-11-33-53-048.png!
> !image-2021-04-29-11-34-18-654.png!
> !image-2021-04-29-11-35-26-994.png!
> !image-2021-04-29-11-35-41-393.png!
> !image-2021-04-29-11-35-52-191.png!
>  
>  
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113

[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Attachment: image-2021-04-29-11-34-18-654.png

> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other broker can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-24-22-704.png!  
> !image-2021-04-29-11-27-12-924.png!
> !image-2021-04-29-11-27-35-679.png!
> !image-2021-04-29-11-25-41-208.png!
> !image-2021-04-29-11-26-34-894.png!
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Attachment: image-2021-04-29-11-33-53-048.png

> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other broker can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-24-22-704.png!  
> !image-2021-04-29-11-27-12-924.png!
> !image-2021-04-29-11-27-35-679.png!
> !image-2021-04-29-11-25-41-208.png!
> !image-2021-04-29-11-26-34-894.png!
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-12729:
-
Attachment: image-2021-04-29-11-33-24-303.png

> controller leader keep disconnected and block the cluster
> -
>
> Key: KAFKA-12729
> URL: https://issues.apache.org/jira/browse/KAFKA-12729
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, KafkaConnect
>Affects Versions: 2.1.1
>Reporter: luws
>Priority: Critical
> Attachments: image-2021-04-29-11-33-24-303.png, 
> image-2021-04-29-11-33-53-048.png
>
>
> kafka version. 2.12-2.1.1
> after zk flash disconnected 
> then broker6 become the controller leader
> but the other broker can't connect to the broker6 until we restart the 
> broker6 then the cluster become recover.
>  
> bandwidth is normal
> !image-2021-04-29-11-24-22-704.png!  
> !image-2021-04-29-11-27-12-924.png!
> !image-2021-04-29-11-27-35-679.png!
> !image-2021-04-29-11-25-41-208.png!
> !image-2021-04-29-11-26-34-894.png!
>  
> {code:java}
> //代码占位符
> Connection to 6 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early

2021-04-28 Thread Bernardo Yusti (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335118#comment-17335118
 ] 

Bernardo Yusti commented on KAFKA-12718:


Hi [~mjsax] ,

I would like to help out with this issue.

This would be my first issue so any pointers on where to start would be greatly 
appreciated!

> SessionWindows are closed too early
> ---
>
> Key: KAFKA-12718
> URL: https://issues.apache.org/jira/browse/KAFKA-12718
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, easy-fix, newbie
> Fix For: 3.0.0
>
>
> SessionWindows are defined based on a {{gap}} parameter, and also support an 
> additional {{grace-period}} configuration to handle out-of-order data.
> To incorporate the session-gap a session window should only be closed at 
> {{window-end + gap}} and to incorporate grace-period, the close time should 
> be pushed out further to {{window-end + gap + grace}}.
> However, atm we compute the window close time as {{window-end + grace}} 
> omitting the {{gap}} parameter.
> Because default grace-period is 24h most users might not notice this issues. 
> Even if they set a grace period explicitly (eg, when using suppress()), they 
> would most likely set a grace-period larger than gap-time not hitting the 
> issue (or maybe only realize it when inspecting the behavior closely).
> However, if a user wants to disable the grace-period and sets it to zero (on 
> any other value smaller than gap-time), sessions might be close too early and 
> user might notice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12729) controller leader keep disconnected and block the cluster

2021-04-28 Thread luws (Jira)
luws created KAFKA-12729:


 Summary: controller leader keep disconnected and block the cluster
 Key: KAFKA-12729
 URL: https://issues.apache.org/jira/browse/KAFKA-12729
 Project: Kafka
  Issue Type: Bug
  Components: controller, KafkaConnect
Affects Versions: 2.1.1
Reporter: luws


kafka version. 2.12-2.1.1

after zk flash disconnected 

then broker6 become the controller leader

but the other broker can't connect to the broker6 until we restart the broker6 
then the cluster become recover.

 

bandwidth is normal

!image-2021-04-29-11-24-22-704.png!  

!image-2021-04-29-11-27-12-924.png!

!image-2021-04-29-11-27-35-679.png!

!image-2021-04-29-11-25-41-208.png!

!image-2021-04-29-11-26-34-894.png!

 
{code:java}
//代码占位符
Connection to 6 was disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-7757:

Attachment: image-2021-04-29-11-27-12-924.png

> Too many open files after java.io.IOException: Connection to n was 
> disconnected before the response was read
> 
>
> Key: KAFKA-7757
> URL: https://issues.apache.org/jira/browse/KAFKA-7757
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, 
> fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, 
> image-2021-04-29-11-25-41-208.png, image-2021-04-29-11-26-34-894.png, 
> image-2021-04-29-11-27-12-924.png, image-2021-04-29-11-27-35-679.png, 
> kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt
>
>
> We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers)
> After a while (hours) 2 brokers start to throw:
> {code:java}
> java.io.IOException: Connection to NN was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> File descriptors start to pile up and if I do not restart it throws "Too many 
> open files" and crashes.  
> {code:java}
> ERROR Error while accepting connection (kafka.network.Acceptor)
> java.io.IOException: Too many open files in system
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:460)
> at kafka.network.Acceptor.run(SocketServer.scala:403)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  After some hours the issue happens again... It has happened with all 
> brokers, so it is not something specific to an instance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-7757:

Attachment: image-2021-04-29-11-27-35-679.png

> Too many open files after java.io.IOException: Connection to n was 
> disconnected before the response was read
> 
>
> Key: KAFKA-7757
> URL: https://issues.apache.org/jira/browse/KAFKA-7757
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, 
> fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, 
> image-2021-04-29-11-25-41-208.png, image-2021-04-29-11-26-34-894.png, 
> image-2021-04-29-11-27-12-924.png, image-2021-04-29-11-27-35-679.png, 
> kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt
>
>
> We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers)
> After a while (hours) 2 brokers start to throw:
> {code:java}
> java.io.IOException: Connection to NN was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> File descriptors start to pile up and if I do not restart it throws "Too many 
> open files" and crashes.  
> {code:java}
> ERROR Error while accepting connection (kafka.network.Acceptor)
> java.io.IOException: Too many open files in system
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:460)
> at kafka.network.Acceptor.run(SocketServer.scala:403)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  After some hours the issue happens again... It has happened with all 
> brokers, so it is not something specific to an instance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-7757:

Attachment: image-2021-04-29-11-26-34-894.png

> Too many open files after java.io.IOException: Connection to n was 
> disconnected before the response was read
> 
>
> Key: KAFKA-7757
> URL: https://issues.apache.org/jira/browse/KAFKA-7757
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, 
> fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, 
> image-2021-04-29-11-25-41-208.png, image-2021-04-29-11-26-34-894.png, 
> kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt
>
>
> We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers)
> After a while (hours) 2 brokers start to throw:
> {code:java}
> java.io.IOException: Connection to NN was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> File descriptors start to pile up and if I do not restart it throws "Too many 
> open files" and crashes.  
> {code:java}
> ERROR Error while accepting connection (kafka.network.Acceptor)
> java.io.IOException: Too many open files in system
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:460)
> at kafka.network.Acceptor.run(SocketServer.scala:403)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  After some hours the issue happens again... It has happened with all 
> brokers, so it is not something specific to an instance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-7757:

Attachment: image-2021-04-29-11-25-41-208.png

> Too many open files after java.io.IOException: Connection to n was 
> disconnected before the response was read
> 
>
> Key: KAFKA-7757
> URL: https://issues.apache.org/jira/browse/KAFKA-7757
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, 
> fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, 
> image-2021-04-29-11-25-41-208.png, kafka-allocated-file-handles.png, 
> server.properties, td1.txt, td2.txt, td3.txt
>
>
> We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers)
> After a while (hours) 2 brokers start to throw:
> {code:java}
> java.io.IOException: Connection to NN was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> File descriptors start to pile up and if I do not restart it throws "Too many 
> open files" and crashes.  
> {code:java}
> ERROR Error while accepting connection (kafka.network.Acceptor)
> java.io.IOException: Too many open files in system
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:460)
> at kafka.network.Acceptor.run(SocketServer.scala:403)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  After some hours the issue happens again... It has happened with all 
> brokers, so it is not something specific to an instance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read

2021-04-28 Thread luws (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luws updated KAFKA-7757:

Attachment: image-2021-04-29-11-24-22-704.png

> Too many open files after java.io.IOException: Connection to n was 
> disconnected before the response was read
> 
>
> Key: KAFKA-7757
> URL: https://issues.apache.org/jira/browse/KAFKA-7757
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, 
> fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, 
> kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt
>
>
> We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers)
> After a while (hours) 2 brokers start to throw:
> {code:java}
> java.io.IOException: Connection to NN was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> File descriptors start to pile up and if I do not restart it throws "Too many 
> open files" and crashes.  
> {code:java}
> ERROR Error while accepting connection (kafka.network.Acceptor)
> java.io.IOException: Too many open files in system
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:460)
> at kafka.network.Acceptor.run(SocketServer.scala:403)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  After some hours the issue happens again... It has happened with all 
> brokers, so it is not something specific to an instance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622707404



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+}
+
+List sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = sortedAllPartitions.size();
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List toBeRemovedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+if (ownedPartitions.size() > 0) {
+consumerAssignment.addAll(ownedPartitions);
+toBeRemovedPartitions.addAll(ownedPartitions);
+}
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+// consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+// so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));
+allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
 } else {
-// It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-if (consumerAssignment.size() == minQuota)
-minCapacityMembers.add(consumer);
-if (consumerAssignment.size() == m

[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622705781



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+}
+
+List sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = sortedAllPartitions.size();
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List toBeRemovedPartitions = new ArrayList<>();

Review comment:
   > And thus are "to be removed" from the list of partitions to be 
assigned -- is that where the name comes from?
   
   Haha, you're right! :) 
   
   Sorry, I should have made the variable naming better.  `assignedPartitions` 
is great! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622704380



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+}
+
+List sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = sortedAllPartitions.size();
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List toBeRemovedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+if (ownedPartitions.size() > 0) {
+consumerAssignment.addAll(ownedPartitions);
+toBeRemovedPartitions.addAll(ownedPartitions);
+}
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {

Review comment:
   Nice catch! Yes, you're right, the post/-pre-increment stuff is 
error-prone. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-7572) Producer should not send requests with negative partition id

2021-04-28 Thread Wenhao Ji (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenhao Ji reassigned KAFKA-7572:


Assignee: Wenhao Ji

> Producer should not send requests with negative partition id
> 
>
> Key: KAFKA-7572
> URL: https://issues.apache.org/jira/browse/KAFKA-7572
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1, 1.1.1
>Reporter: Yaodong Yang
>Assignee: Wenhao Ji
>Priority: Major
>  Labels: patch-available
>
> h3. Issue:
> In one Kafka producer log from our users, we found the following weird one:
> timestamp="2018-10-09T17:37:41,237-0700",level="ERROR", Message="Write to 
> Kafka failed with: ",exception="java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topicName--2: 30042 ms has passed since batch creation plus linger time
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for topicName--2: 30042 ms has passed since batch creation plus 
> linger time"
> After a few hours debugging, we finally understood the root cause of this 
> issue:
>  # The producer used a buggy custom Partitioner, which sometimes generates 
> negative partition ids for new records.
>  # The corresponding produce requests were rejected by brokers, because it's 
> illegal to have a partition with a negative id.
>  # The client kept refreshing its local cluster metadata, but could not send 
> produce requests successfully.
>  # From the above log, we found a suspicious string "topicName--2":
>  # According to the source code, the format of this string in the log is 
> TopicName+"-"+PartitionId.
>  # It's not easy to notice that there were 2 consecutive dash in the above 
> log.
>  # Eventually, we found that the second dash was a negative sign. Therefore, 
> the partition id is -2, rather than 2.
>  # The bug the custom Partitioner.
> h3. Proposal:
>  # Producer code should check the partitionId before sending requests to 
> brokers.
>  # If there is a negative partition Id, just throw an IllegalStateException{{ 
> }}exception.
>  # Such a quick check can save lots of time for people debugging their 
> producer code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-28 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335095#comment-17335095
 ] 

Luke Chen commented on KAFKA-9295:
--

_a longer timeout should be used in this case._

Agree!

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622670757



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 if (log.isDebugEnabled()) {
 log.debug("final assignment: " + assignment);
 }
-
+
 return assignment;
 }
 
-private SortedSet getTopicPartitions(Map 
partitionsPerTopic) {
-SortedSet allPartitions =
-new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-for (Entry entry: partitionsPerTopic.entrySet()) {
-String topic = entry.getKey();
-for (int i = 0; i < entry.getValue(); ++i) {
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   Thank you very much, @ableegoldman ! I'll address your comments today, 
thank you :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10598: MINOR: rename wrong topic id variable name and description

2021-04-28 Thread GitBox


showuon commented on pull request #10598:
URL: https://github.com/apache/kafka/pull/10598#issuecomment-828879776


   @chia7712 , I've updated my github profile. It's `show...@gmail.com`. Haha~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-28 Thread GitBox


guozhangwang merged pull request #10462:
URL: https://github.com/apache/kafka/pull/10462


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-28 Thread GitBox


guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r622657322



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+// the put() call will advance the stream time, which causes 
records out of the retention
+// period to be deleted, thus not being emitted later.
+if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+emitExpiredNonJoinedOuterRecords();
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {

Review comment:
   @spena seems there are a few different conditions we can consider here:
   
   1) record time < stream time - window length - grace length: the record is 
too late, we should drop it up front and also record the 
`droppedRecordsSensorOrExpiredWindowRecordDropSensor`.
   
   2) record time >= stream time - window length - grace length, but < stream 
time: the record is still late, but joinable, since the stream time would not 
be advanced we would not have to check and emit non-joined records, but just 
try to join this record with the other window. Note that like @mjsax said, for 
the returned matching record, we also need to check if the other record time >= 
stream time - window length - grace length or not.
   
   3) record time > stream time, we would first try to emit non-joined records, 
and then try to join this record.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTime

[GitHub] [kafka] kowshik commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-28 Thread GitBox


kowshik commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r622606093



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTransform {

Review comment:
   Instead of introducing this separate class, why not have 
`RemoteLogSegmentMetadata` implement the `RemoteLogMetadataTransform< 
RemoteLogSegmentMetadata>` interface directly? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10607: MINOR: bump version to 2.6.3-SNAPSHOT in missing files

2021-04-28 Thread GitBox


ableegoldman merged pull request #10607:
URL: https://github.com/apache/kafka/pull/10607


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10607: MINOR: bump version to 2.6.3-SNAPSHOT in missing files

2021-04-28 Thread GitBox


ableegoldman opened a new pull request #10607:
URL: https://github.com/apache/kafka/pull/10607


   I was just editing something on the release process wiki and noticed that 
under the "Cut Branch" section, which it says to skip for bugfix releases, 
there's actually a list of files that it then says to update for both feature 
and bugfix releases. Luckily most of those files are actually covered later in 
the wiki, but two were not.
   
   Also updated the release process wiki to clear this up for future RMs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-04-28 Thread GitBox


junrao commented on pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#issuecomment-828795518


   I think the upgrade case is similar---it's rare and transient. So, we could 
choose to have a simpler and less optimized way for handling it. I am not sure 
if we trigger metadata refresh for top level error right now. If not, we could 
probably just add the logic to refresh metadata for all topics on topicId error 
at the top level.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-28 Thread GitBox


kowshik commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r621909054



##
File path: 
raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.metadata;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.protocol.Writable;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.RecordSerde;
+
+/**
+ * This is an implementation of {@code RecordSerde} with {@link 
ApiMessageAndVersion} but implementors need to implement
+ * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for 
the given {@code apiKey}.
+ *
+ * This can be used as the underlying serialization mechanism for any metadata 
kind of log storage.

Review comment:
   The grammar could be improved a bit here: `... mechanism for any 
metadata kind of log storage.` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-28 Thread GitBox


hachikuji commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r622494008



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -60,19 +61,21 @@
 @Override
 void close();
 
-class Batch {
+final class Batch implements Iterable {
 private final long baseOffset;
 private final int epoch;
+private final long lastOffset;
 private final List records;
 
-public Batch(long baseOffset, int epoch, List records) {
+private Batch(long baseOffset, int epoch, long lastOffset, List 
records) {

Review comment:
   Perhaps it is clear enough already, but maybe we should document that 
these offsets are inclusive.

##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -226,7 +232,11 @@ class TestRaftServer(
 reader.close()
   }
 
-case _ =>
+case HandleSnapshot(reader) =>
+  // Ignore snapshots; only interested on records appended by this 
leader

Review comment:
   nit: on -> in?

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) {
 private void updateListenersProgress(List 
listenerContexts, long highWatermark) {
 for (ListenerContext listenerContext : listenerContexts) {
 listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
-if (nextExpectedOffset < log.startOffset()) {
-listenerContext.fireHandleSnapshot(log.startOffset());
+if (nextExpectedOffset < log.startOffset() && 
nextExpectedOffset < highWatermark) {
+SnapshotReader snapshot = 
latestSnapshot().orElseThrow(() -> {
+return new IllegalStateException(
+String.format(
+"Snapshot expected when next offset is %s, log 
start offset is %s and high-watermark is %s",

Review comment:
   Perhaps it is useful to mention the class of the listener since we are 
referring to its next expected offset?

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -426,48 +422,55 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
 public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
 @Override
-public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-if (logStartOffset() > logStartSnapshotId.offset ||
-highWatermark.offset < logStartSnapshotId.offset) {
-
+public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
+if (logStartOffset() > snapshotId.offset) {
+throw new OffsetOutOfRangeException(
+String.format(
+"New log start (%s) is less than the curent log start 
offset (%s)",
+snapshotId,
+logStartOffset()
+)
+);
+}
+if (highWatermark.offset < snapshotId.offset) {
 throw new OffsetOutOfRangeException(
 String.format(
-"New log start (%s) is less than start offset (%s) or is 
greater than the high watermark (%s)",
-logStartSnapshotId,
-logStartOffset(),
+"New log start (%s) is greater than the high watermark 
(%s)",
+snapshotId,
 highWatermark.offset
 )
 );
 }
 
 boolean updated = false;
-Optional snapshotIdOpt = latestSnapshotId();
-if (snapshotIdOpt.isPresent()) {
-OffsetAndEpoch snapshotId = snapshotIdOpt.get();
-if (startOffset() < logStartSnapshotId.offset &&
-highWatermark.offset >= logStartSnapshotId.offset &&
-snapshotId.offset >= logStartSnapshotId.offset) {
+if (snapshots.containsKey(snapshotId)) {
+snapshots.headMap(snapshotId, false).clear();
 
-snapshots.headMap(logStartSnapshotId, false).clear();
+// Update the high watermark if it is less than the new log start 
offset
+if (snapshotId.offset > highWatermark.offset) {

Review comment:
   Maybe I am missing something, but how could this be possible given the 
check above?

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List records) {
 return this;
 }
 
+Builder withSnapshot(OffsetAndEpoch snapshotId) throws IOException {

Review comment:
   Maybe `withEmptySnapshot`?

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -363,33 +362,30 @@ public LogFetchInfo read(long startOffset, Isolation 
isolation) 

[jira] [Commented] (KAFKA-8613) Set default grace period to 0

2021-04-28 Thread Israel Ekpo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335015#comment-17335015
 ] 

Israel Ekpo commented on KAFKA-8613:


I have the initial changes (without the Unit tests yet) here

[https://github.com/izzyacademy/kafka/commit/c7094fae666f407e413c2b9d40e980a68aebf81f]

Please let me know what you think. I will be working on the Unit tests in about 
a day and will submit a PR with once the Unit tests are completed.

> Set default grace period to 0
> -
>
> Key: KAFKA-8613
> URL: https://issues.apache.org/jira/browse/KAFKA-8613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> Currently, the grace period is set to retention time if the grace period is 
> not specified explicitly. The reason for setting the default grace period to 
> retention time was backward compatibility. Topologies that were implemented 
> before the introduction of the grace period, added late arriving records to a 
> window as long as the window existed, i.e., as long as its retention time was 
> not elapsed.
> This unintuitive default grace period has already caused confusion among 
> users.
> For the next major release, we should set the default grace period to 
> {{Duration.ZERO}}.
>  
> KIP-633 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 removed a comment on pull request #10603: WIP: handle missing sink topics in a separate way

2021-04-28 Thread GitBox


wcarlson5 removed a comment on pull request #10603:
URL: https://github.com/apache/kafka/pull/10603#issuecomment-828525217


   @ableegoldman @rodesai @cadonna as a rough improvement what do you think? I 
don't think that we need to get this perfect right now but I think that any 
differentiation of sink topic missing and a task corrupted exception would be 
good?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 closed pull request #10603: WIP: handle missing sink topics in a separate way

2021-04-28 Thread GitBox


wcarlson5 closed pull request #10603:
URL: https://github.com/apache/kafka/pull/10603


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #10603: WIP: handle missing sink topics in a separate way

2021-04-28 Thread GitBox


wcarlson5 commented on pull request #10603:
URL: https://github.com/apache/kafka/pull/10603#issuecomment-828769426


   This will not work without significant other changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-28 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-10800:
---
Description: 
When the state machine attempts to create a snapshot writer we should validate 
that the following is true:
 # The end offset of the snapshot is less than or equal to the high-watermark.
 # The epoch of the snapshot is less than or equal to the quorum epoch.
 # The end offset and epoch of the snapshot is valid based on the leader epoch 
cache.

Note that this validation should not be performed when the raft client creates 
the snapshot writer because in that case the local log is out of date and the 
follower should trust the snapshot id sent by the partition leader.

  was:
When the state machine attempts to create a snapshot writer we should validate 
that the following is true:
 # The end offset of the snapshot is less than or equal to the high-watermark.
 # The epoch of the snapshot is equal to the quorum epoch.
 # The end offset and epoch of the snapshot is valid based on the leader epoch 
cache.

Note that this validation should not be performed when the raft client creates 
the snapshot writer because in that case the local log is out of date and the 
follower should trust the snapshot id sent by the partition leader.


> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset of the snapshot is less than or equal to the high-watermark.
>  # The epoch of the snapshot is less than or equal to the quorum epoch.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-28 Thread GitBox


ableegoldman commented on pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#issuecomment-828752351


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-28 Thread GitBox


ableegoldman merged pull request #10573:
URL: https://github.com/apache/kafka/pull/10573


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-28 Thread GitBox


ableegoldman commented on pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#issuecomment-828751235


   Tons of flaky test failures, all unrelated. Mostly Connect and Raft, a few 
Streams tests that are known to be flaky which I left some thoughts on the 
ticket for. Going to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8613) Set default grace period to 0

2021-04-28 Thread Israel Ekpo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334984#comment-17334984
 ] 

Israel Ekpo commented on KAFKA-8613:


I noticed that the method signature in the KIP had the incorrect return value.
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-633:+Drop+24+hour+default+of+grace+period+in+Streams
 
Should I go ahead and update this in the KIP? I just noticed the error during 
my implementation
 
For the class org.apache.kafka.streams.kstream.SessionWindows
 
{{public}} {{static}} {{*JoinWindows* ofInactivityGapAndGrace(}}{{final}} 
{{Duration inactivityGap, }}{{final}} {{Duration afterWindowEnd);}}
 
Should have been:
 
{{public}} {{static}} {{*SessionWindows* ofInactivityGapAndGrace(}}{{final}} 
{{Duration inactivityGap, }}{{final}} {{Duration afterWindowEnd);}}
 
I will update the KIP to reflect what I think it needs to be. Please let me 
know your thoughts when you see this.
 
Thanks

> Set default grace period to 0
> -
>
> Key: KAFKA-8613
> URL: https://issues.apache.org/jira/browse/KAFKA-8613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> Currently, the grace period is set to retention time if the grace period is 
> not specified explicitly. The reason for setting the default grace period to 
> retention time was backward compatibility. Topologies that were implemented 
> before the introduction of the grace period, added late arriving records to a 
> window as long as the window existed, i.e., as long as its retention time was 
> not elapsed.
> This unintuitive default grace period has already caused confusion among 
> users.
> For the next major release, we should set the default grace period to 
> {{Duration.ZERO}}.
>  
> KIP-633 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-28 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622505823



##
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+val topic0 = "topicWithoutCompaction"
+val topic1 = "topicWithCompaction"
+val t0p0 = new TopicPartition(topic0, 0)
+val t1p0 = new TopicPartition(topic1, 0)
+val t1p1 = new TopicPartition(topic1, 1)
+val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+val producerProps = new Properties()
+// Each batch will hold about 10 records
+producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+val producer = createProducer(configOverrides = producerProps)
+// First topic will not have compaction. Simply a sanity test.
+createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+
+
+// Second topic will have compaction. 
+// The first partition will have compaction occur at offset 0 so 
beginningOffsets should be nonzero.
+// The second partition will not have compaction occur at offset 0, so 
beginningOffsets will remain 0.
+val props = new Properties()
+props.setProperty(LogConfig.MaxCompactionLagMsProp, "1")
+props.setProperty(LogConfig.CleanupPolicyProp, "compact")
+props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01")
+props.setProperty(LogConfig.SegmentBytesProp, "512")
+
+// Send records to first partition -- all duplicates.
+createTopic(topic1, numPartitions = 2, replicationFactor = 1, props)
+TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 
0), "key")
+
+// Send records fo second partition -- only last 50 records are duplicates.
+sendRecords(producer, 50, t1p1)
+TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 
1), "key")
+
+// Sleep to allow compaction to take place.
+Thread.sleep(25000)

Review comment:
   Sounds good. On further inspection looks like `log.cleaner.backoff.ms` 
is 15 seconds, which is why this is happening. I could try to figure out how to 
change this property for this test, but it may be easier to remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-04-28 Thread GitBox


mjsax commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-828736200


   \cc @vvcephei who was RM for 2.8 for tracking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-04-28 Thread GitBox


mjsax commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r622492627



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+@SuppressWarnings("unchecked")
+public static void main(final String[] args) throws Exception {
+if (args.length < 1) {
+System.err.println("StreamsUpgradeTest requires one argument 
(properties-file) but provided none");
+}
+final String propFileName = args[0];
+
+final Properties streamsProperties = Utils.loadProps(propFileName);
+
+System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v2.7)");
+System.out.println("props=" + streamsProperties);
+
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream dataStream = builder.stream("data");
+dataStream.process(printProcessorSupplier());
+dataStream.to("echo");
+
+final Properties config = new Properties();
+config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
+config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+config.putAll(streamsProperties);
+
+final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+streams.start();
+
+Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+streams.close();
+System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+System.out.flush();
+}));
+}
+
+private static  ProcessorSupplier printProcessorSupplier() {
+return () -> new AbstractProcessor() {
+private int numRecordsProcessed = 0;
+
+@Override
+public void init(final ProcessorContext context) {
+System.out.println("[2.6] initializing processor: topic=data 
taskId=" + context.taskId());

Review comment:
   Should be updated to 2.8 (Seem to be a c&p error) could you fix this in 
`2.7` and update to `2.7` ?
   
   Maybe worth to introduce a variable that encodes the version?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-04-28 Thread GitBox


mjsax commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r622492083



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+@SuppressWarnings("unchecked")
+public static void main(final String[] args) throws Exception {
+if (args.length < 1) {
+System.err.println("StreamsUpgradeTest requires one argument 
(properties-file) but provided none");
+}
+final String propFileName = args[0];
+
+final Properties streamsProperties = Utils.loadProps(propFileName);
+
+System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v2.7)");

Review comment:
   Should be updated to `2.8`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests

2021-04-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12724:

Component/s: system tests
 streams

> Add 2.8.0 to system tests and streams upgrade tests
> ---
>
> Key: KAFKA-12724
> URL: https://issues.apache.org/jira/browse/KAFKA-12724
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Kafka v2.8.0 is released. We should add this version to the system tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-28 Thread GitBox


jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r622489394



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -67,6 +74,30 @@ protected LeaderState(
 }
 this.grantingVoters.addAll(grantingVoters);
 this.log = logContext.logger(LeaderState.class);
+this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+}
+
+public BatchAccumulator accumulator() {
+return this.accumulator;
+}
+
+private static List convertToVoters(Set voterIds) {
+return voterIds.stream()
+.map(follower -> new Voter().setVoterId(follower))
+.collect(Collectors.toList());
+}
+
+public void appendLeaderChangeMessage(long currentTimeMs) {
+List voters = convertToVoters(voterStates.keySet());
+List grantingVoters = convertToVoters(this.grantingVoters());
+
+LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
+.setLeaderId(this.election().leaderId())
+.setVoters(voters)
+.setGrantingVoters(grantingVoters);
+
+accumulator.appendLeaderChangeMessage(leaderChangeMessage, 
currentTimeMs);
+accumulator.forceDrain();

Review comment:
   Okay. We are attempting to keep the semantic for 
`appendLeaderChangeMessage` similar to that of the other `append` methods. In 
that case I am okay with this API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early

2021-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334964#comment-17334964
 ] 

Matthias J. Sax commented on KAFKA-12718:
-

I was just looking into `suppress()` implementation, that obviously does not 
know anything about semantics of upstream window definitions. It computes the 
"expiry-time" based on window-end time. Thus, I believe that the right fix for 
session windows would be to enforce that gracePeriod > gap. \cc [~ableegoldman] 
as you proposed to address this inside the session window processor (I think 
this approach won't work).

> SessionWindows are closed too early
> ---
>
> Key: KAFKA-12718
> URL: https://issues.apache.org/jira/browse/KAFKA-12718
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, easy-fix, newbie
> Fix For: 3.0.0
>
>
> SessionWindows are defined based on a {{gap}} parameter, and also support an 
> additional {{grace-period}} configuration to handle out-of-order data.
> To incorporate the session-gap a session window should only be closed at 
> {{window-end + gap}} and to incorporate grace-period, the close time should 
> be pushed out further to {{window-end + gap + grace}}.
> However, atm we compute the window close time as {{window-end + grace}} 
> omitting the {{gap}} parameter.
> Because default grace-period is 24h most users might not notice this issues. 
> Even if they set a grace period explicitly (eg, when using suppress()), they 
> would most likely set a grace-period larger than gap-time not hitting the 
> issue (or maybe only realize it when inspecting the behavior closely).
> However, if a user wants to disable the grace-period and sets it to zero (on 
> any other value smaller than gap-time), sessions might be close too early and 
> user might notice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

2021-04-28 Thread GitBox


jsancio commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-828720442


   @vamossagar12 Can you add a description to the PR? It is helpful when 
reviewing this PR to know what strategy you are using to manage the flush 
offset and log flush.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622474244



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 if (log.isDebugEnabled()) {
 log.debug("final assignment: " + assignment);
 }
-
+
 return assignment;
 }
 
-private SortedSet getTopicPartitions(Map 
partitionsPerTopic) {
-SortedSet allPartitions =
-new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-for (Entry entry: partitionsPerTopic.entrySet()) {
-String topic = entry.getKey();
-for (int i = 0; i < entry.getValue(); ++i) {
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   Just let me know when you've responded to my other comments and this is 
ready for another pass. Nice to see such an improvement along with making this 
the default




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-28 Thread GitBox


hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622473204



##
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+val topic0 = "topicWithoutCompaction"
+val topic1 = "topicWithCompaction"
+val t0p0 = new TopicPartition(topic0, 0)
+val t1p0 = new TopicPartition(topic1, 0)
+val t1p1 = new TopicPartition(topic1, 1)
+val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+val producerProps = new Properties()
+// Each batch will hold about 10 records
+producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+val producer = createProducer(configOverrides = producerProps)
+// First topic will not have compaction. Simply a sanity test.
+createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+
+
+// Second topic will have compaction. 
+// The first partition will have compaction occur at offset 0 so 
beginningOffsets should be nonzero.
+// The second partition will not have compaction occur at offset 0, so 
beginningOffsets will remain 0.
+val props = new Properties()
+props.setProperty(LogConfig.MaxCompactionLagMsProp, "1")
+props.setProperty(LogConfig.CleanupPolicyProp, "compact")
+props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01")
+props.setProperty(LogConfig.SegmentBytesProp, "512")
+
+// Send records to first partition -- all duplicates.
+createTopic(topic1, numPartitions = 2, replicationFactor = 1, props)
+TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 
0), "key")
+
+// Send records fo second partition -- only last 50 records are duplicates.
+sendRecords(producer, 50, t1p1)
+TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 
1), "key")
+
+// Sleep to allow compaction to take place.
+Thread.sleep(25000)

Review comment:
   I'd rather just remove it if we don't have a good way to keep it from 
flaking.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-28 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r622470077



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -67,6 +74,30 @@ protected LeaderState(
 }
 this.grantingVoters.addAll(grantingVoters);
 this.log = logContext.logger(LeaderState.class);
+this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+}
+
+public BatchAccumulator accumulator() {
+return this.accumulator;
+}
+
+private static List convertToVoters(Set voterIds) {
+return voterIds.stream()
+.map(follower -> new Voter().setVoterId(follower))
+.collect(Collectors.toList());
+}
+
+public void appendLeaderChangeMessage(long currentTimeMs) {
+List voters = convertToVoters(voterStates.keySet());
+List grantingVoters = convertToVoters(this.grantingVoters());
+
+LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
+.setLeaderId(this.election().leaderId())
+.setVoters(voters)
+.setGrantingVoters(grantingVoters);
+
+accumulator.appendLeaderChangeMessage(leaderChangeMessage, 
currentTimeMs);
+accumulator.forceDrain();

Review comment:
   It was my suggestion in order to keep the contract of `BatchAccumulator` 
tight. We do "know" in this usage that the accumulator should be empty, but it 
costs us very little to protect the API without relying on external assumptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-28 Thread GitBox


ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622472215



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 if (log.isDebugEnabled()) {
 log.debug("final assignment: " + assignment);
 }
-
+
 return assignment;
 }
 
-private SortedSet getTopicPartitions(Map 
partitionsPerTopic) {
-SortedSet allPartitions =
-new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-for (Entry entry: partitionsPerTopic.entrySet()) {
-String topic = entry.getKey();
-for (int i = 0; i < entry.getValue(); ++i) {
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   Thanks for getting some concrete numbers to work with! I suspected the 
theory would not match the reality due to caching primarily, although I wasn't 
aware of the improved runtime of sort on a partially-ordered list. That's good 
to know 😄  And it does make sense in hindsight given the nature of the sorting 
algorithm. 
   
   I've always found that the reality of array performance with any reasonable 
caching architecture, compared to theoretically better data 
structures/algorithms is one of those things that people know and still 
subconsciously doubt. Probably because most people spent 4 years in college 
getting theoretical algorithmic runtimes drilled into their heads, and far less 
time looking into the underlying architecture that powers those algorithms and 
applies its own optimizations under the hood. It's an interesting psychological 
observation. There's a great talk on it somewhere but I can't remember the name
   
   Anyways, you just never know until you run the numbers. I'm sure this may 
vary somewhat with different input parameters but I think I'm convinced, let's 
stick with this improvement. If someone starts complaining about the memory 
consumption we can always go back and look for ways to cut down. Thanks for the 
enlightening discussion 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-28 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r622470077



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -67,6 +74,30 @@ protected LeaderState(
 }
 this.grantingVoters.addAll(grantingVoters);
 this.log = logContext.logger(LeaderState.class);
+this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+}
+
+public BatchAccumulator accumulator() {
+return this.accumulator;
+}
+
+private static List convertToVoters(Set voterIds) {
+return voterIds.stream()
+.map(follower -> new Voter().setVoterId(follower))
+.collect(Collectors.toList());
+}
+
+public void appendLeaderChangeMessage(long currentTimeMs) {
+List voters = convertToVoters(voterStates.keySet());
+List grantingVoters = convertToVoters(this.grantingVoters());
+
+LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
+.setLeaderId(this.election().leaderId())
+.setVoters(voters)
+.setGrantingVoters(grantingVoters);
+
+accumulator.appendLeaderChangeMessage(leaderChangeMessage, 
currentTimeMs);
+accumulator.forceDrain();

Review comment:
   It was my suggestion in order to keep the contract of `BatchAccumulator` 
tight. We do "know" in this usage that the accumulator will be empty, but it 
costs us very little to protect the API without relying on external assumptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 opened a new pull request #10606: KAFKA-12728: [Work In Progress] gradle version upgrade: 6.8 -->> 7.0

2021-04-28 Thread GitBox


dejan2609 opened a new pull request #10606:
URL: https://github.com/apache/kafka/pull/10606


   **_JIRA ticket:_** https://issues.apache.org/jira/browse/KAFKA-12728
   
   **_Related pull requests:_** 
   - #10203 
   - #10466 
   
   Note: we will wait for a Gradle 7.0.1 patch ◀️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries

2021-04-28 Thread GitBox


ableegoldman commented on a change in pull request #10597:
URL: https://github.com/apache/kafka/pull/10597#discussion_r622464617



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -344,7 +345,7 @@ private boolean isRunningOrRebalancing() {
 
 private void validateIsRunningOrRebalancing() {
 if (!isRunningOrRebalancing()) {
-throw new IllegalStateException("KafkaStreams is not running. 
State is " + state + ".");
+throw new StreamsNotStartedException("KafkaStreams is not running. 
State is " + state + ".");

Review comment:
   SGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-28 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334937#comment-17334937
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9295:
---

It's failing on 
{noformat}
startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(60));
{noformat}

At this point a session timeout seems unlikely, since [~showuon] observed a 
Streams instance dropping out on the heartbeat interval only a couple of times 
even when it failed, and all it has to do here is get to RUNNING once. It 
doesn't require that all KafkaStreams in the list get to RUNNING and then stay 
there, so all the instance has to do is start up and go through at least once 
successful rebalance in that time. There's nothing to restore so the transition 
to RUNNING should be immediate after the rebalance.

Now technically 60s is a typical timeout for 
startApplicationAndWaitUntilRunning in the Streams integration tests, but the 
difference between this and other tests is that most have only one or two 
KafkaStreams to start up whereas this test has three. They're not started up 
and waited on sequentially so that shouldn't _really_ matter that much, but 
still it might just be that a longer timeout should be used in this case. I'm 
open to other theories however

Also note that we should soon have a larger default session interval, so once 
Jason's KIP for that has been implemented we'll be able to get that improvement 
for free. Even if we think the session interval is the problem with this test, 
it probably makes sense to just wait for that KIP than to hardcode in some 
special value. If it starts to fail very frequently we can reconsider, but I 
haven't observed it doing so since the last fix was merged

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-28 Thread GitBox


jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r622455380



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -67,6 +74,30 @@ protected LeaderState(
 }
 this.grantingVoters.addAll(grantingVoters);
 this.log = logContext.logger(LeaderState.class);
+this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+}
+
+public BatchAccumulator accumulator() {
+return this.accumulator;
+}
+
+private static List convertToVoters(Set voterIds) {
+return voterIds.stream()
+.map(follower -> new Voter().setVoterId(follower))
+.collect(Collectors.toList());
+}
+
+public void appendLeaderChangeMessage(long currentTimeMs) {
+List voters = convertToVoters(voterStates.keySet());
+List grantingVoters = convertToVoters(this.grantingVoters());
+
+LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
+.setLeaderId(this.election().leaderId())
+.setVoters(voters)
+.setGrantingVoters(grantingVoters);
+
+accumulator.appendLeaderChangeMessage(leaderChangeMessage, 
currentTimeMs);
+accumulator.forceDrain();

Review comment:
   We `forceDrain` inside `appendLeaderChangeMessage` before adding the 
leader change message to `completed`. Is this `forceDrain()` a no-op? Should 
the current batch be empty at this point? If we don't need to call this method, 
I say we remove it all together.

##
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##
@@ -65,6 +66,132 @@
 );
 }
 
+@Test
+public void testLeaderChangeMessageWritten() {
+int leaderEpoch = 17;
+long baseOffset = 0;
+int lingerMs = 50;
+int maxBatchSize = 512;
+
+ByteBuffer buffer = ByteBuffer.allocate(256);
+Mockito.when(memoryPool.tryAllocate(256))
+.thenReturn(buffer);
+
+BatchAccumulator acc = buildAccumulator(
+leaderEpoch,
+baseOffset,
+lingerMs,
+maxBatchSize
+);
+
+acc.appendLeaderChangeMessage(new LeaderChangeMessage(), 
time.milliseconds());

Review comment:
   Let's check that `needsDrain` returns true after this point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12728) Update Gradle version: 6.8 -->> 7.0

2021-04-28 Thread Jira
Dejan Stojadinović created KAFKA-12728:
--

 Summary: Update Gradle version: 6.8 -->> 7.0
 Key: KAFKA-12728
 URL: https://issues.apache.org/jira/browse/KAFKA-12728
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: Dejan Stojadinović
Assignee: Dejan Stojadinović


^_*Prologue/related tickets*:_ KAFKA-12415 _*and*_ KAFKA-12417^

*Gradle 7.0 release notes:* [https://docs.gradle.org/7.0/release-notes.html]

*_+Note+_*: it makes sense to wait for a patch *_7.0.1_* to be released: 
[https://github.com/gradle/gradle/milestone/173]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-28 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622456807



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##
@@ -984,19 +1003,26 @@ class LogCleanerTest {
 
 def distinctValuesBySegment = log.logSegments.map(s => 
s.log.records.asScala.map(record => 
TestUtils.readString(record.value)).toSet.size).toSeq
 
-val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
 assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
   "Test is not effective unless each segment contains duplicates. Increase 
segment size or decrease number of keys.")
 
+log.updateHighWatermark(log.activeSegment.baseOffset)
 cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
firstUncleanableOffset))
 
 val distinctValuesBySegmentAfterClean = distinctValuesBySegment
 
-
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
-  .take(numCleanableSegments).forall { case (before, after) => after < 
before },
+// One segment should have been completely deleted, so there will be fewer 
segments.
+assertTrue(distinctValuesBySegmentAfterClean.size < 
distinctValuesBySegmentBeforeClean.size)
+
+// Drop the first segment from before cleaning since it was removed. Also 
subtract 1 from numCleanableSegments
+val normalizedDistinctValuesBySegmentBeforeClean = 
distinctValuesBySegmentBeforeClean.drop(1)

Review comment:
   This test is a little tricky, but I've updated it. Now it only uses 
duplicate keys. It's a little confusing because the first uncleanable offset is 
not actually the point at which records below are cleaned. The segments cleaned 
are the full segments below the uncleanable offset (so if the segments before 
the uncleanable offset in this case). And even then, one record (the last 
record in the last segment) will be retained due to how cleaning works. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-28 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622456807



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##
@@ -984,19 +1003,26 @@ class LogCleanerTest {
 
 def distinctValuesBySegment = log.logSegments.map(s => 
s.log.records.asScala.map(record => 
TestUtils.readString(record.value)).toSet.size).toSeq
 
-val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
 assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
   "Test is not effective unless each segment contains duplicates. Increase 
segment size or decrease number of keys.")
 
+log.updateHighWatermark(log.activeSegment.baseOffset)
 cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
firstUncleanableOffset))
 
 val distinctValuesBySegmentAfterClean = distinctValuesBySegment
 
-
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
-  .take(numCleanableSegments).forall { case (before, after) => after < 
before },
+// One segment should have been completely deleted, so there will be fewer 
segments.
+assertTrue(distinctValuesBySegmentAfterClean.size < 
distinctValuesBySegmentBeforeClean.size)
+
+// Drop the first segment from before cleaning since it was removed. Also 
subtract 1 from numCleanableSegments
+val normalizedDistinctValuesBySegmentBeforeClean = 
distinctValuesBySegmentBeforeClean.drop(1)

Review comment:
   This test is a little tricky, but I've updated it. Now it only uses 
duplicate keys. It's a little confusing because the first uncleanable offset is 
not actually the point at which records below are cleaned. It the segments 
cleaned are the full segments below the uncleanable offset. And even then, one 
record (the last record in the last segment) will be retained due to how 
cleaning works. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-04-28 Thread GitBox


hachikuji commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-828703078


   @tombentley Thanks for the updates. I made a few small changes in this 
patch: 
https://github.com/hachikuji/kafka/commit/27ba937cc7b7da230ccc4f0c8220f680c4a542fe.
 The main things are taking the loading itself out of `compute` (which is 
intended to be a cheap operation) and moving the `loadingPartitions` check. 
Feel free to pull in the changes if they make sense to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-28 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-9295:
---

Still failing, saw this on both the Java 8 and Java 11 build of a PR:

Stacktrace
java.lang.AssertionError: Application did not reach a RUNNING state for all 
streams instances. Non-running instances: 
{org.apache.kafka.streams.KafkaStreams@cce6a1d=REBALANCING, 
org.apache.kafka.streams.KafkaStreams@45af6187=REBALANCING, 
org.apache.kafka.streams.KafkaStreams@5d8ba53=REBALANCING}
at org.junit.Assert.fail(Assert.java:89)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:904)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:197)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185)


https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10573/8/testReport/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldInnerJoinMultiPartitionQueryable/

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12727) Flaky test ListConsumerGroupTest.testListConsumerGroupsWithStates

2021-04-28 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12727:
--

 Summary: Flaky test 
ListConsumerGroupTest.testListConsumerGroupsWithStates
 Key: KAFKA-12727
 URL: https://issues.apache.org/jira/browse/KAFKA-12727
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman


Stacktrace
org.opentest4j.AssertionFailedError: Expected to show groups 
Set((groupId='simple-group', isSimpleConsumerGroup=true, 
state=Optional[Empty]), (groupId='test.group', isSimpleConsumerGroup=false, 
state=Optional[Stable])), but found Set((groupId='test.group', 
isSimpleConsumerGroup=false, state=Optional[Stable]))
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
at org.junit.jupiter.api.Assertions.fail(Assertions.java:117)
at 
kafka.admin.ListConsumerGroupTest.testListConsumerGroupsWithStates(ListConsumerGroupTest.scala:66)

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10573/8/testReport/kafka.admin/ListConsumerGroupTest/Build___JDK_11_and_Scala_2_13___testListConsumerGroupsWithStates__/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12417) streams module should use `testRuntimeClasspath` instead of `testRuntime` configuration

2021-04-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dejan Stojadinović resolved KAFKA-12417.

Resolution: Done

Resolving as done (commit is merged into trunk).

> streams module should use `testRuntimeClasspath` instead of `testRuntime` 
> configuration
> ---
>
> Key: KAFKA-12417
> URL: https://issues.apache.org/jira/browse/KAFKA-12417
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Dejan Stojadinović
>Priority: Major
>
> The streams module has the following code:
> {code:java}
> tasks.create(name: "copyDependantLibs", type: Copy) {
>   from (configurations.testRuntime) {
> include('slf4j-log4j12*')
> include('log4j*jar')
> include('*hamcrest*')
>   }
>   from (configurations.runtimeClasspath) {
> exclude('kafka-clients*')
>   }
>   into "$buildDir/dependant-libs-${versions.scala}"
>   duplicatesStrategy 'exclude'
> } {code}
> {{configurations.testRuntime}} should be changed to 
> {{configurations.testRuntimeClasspath}} as the former has been removed in 
> Gradle 7.0, but this causes a cyclic build error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr closed pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-28 Thread GitBox


dielhennr closed pull request #10480:
URL: https://github.com/apache/kafka/pull/10480


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-28 Thread GitBox


jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r622425656



##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1335,6 +1313,57 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
 
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
 }
 
+@Test
+public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+int localId = 0;
+int otherNodeId = localId + 1;
+Set voters = Utils.mkSet(localId, otherNodeId);
+int epoch = 2;
+
+List appendRecords = Arrays.asList("a", "b", "c");
+OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.appendToLog(epoch, appendRecords)
+.withAppendLingerMs(1)
+.build();
+
+context.becomeLeader();
+int currentEpoch = context.currentEpoch();
+
+// When creating snapshot:
+// 1.1 high watermark cannot be empty
+assertEquals(OptionalLong.empty(), context.client.highWatermark());
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+// 1.2 high watermark must larger than or equal to the snapshotId's 
endOffset
+advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);

Review comment:
   I would append a couple of batches after advancing the high-watermark. 
At this point the HWM equals the LEO.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,25 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < 
snapshotId.offset) {
+throw new KafkaException("Trying to creating snapshot with invalid 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+"size of records between the latest snapshot and the 
high-watermark when creating snapshot");

Review comment:
   What do you mean by ", since the there should be a minimum size of 
records between the latest snapshot and the high-watermark when creating 
snapshot"?

##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1335,6 +1313,57 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
 
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
 }
 
+@Test
+public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+int localId = 0;
+int otherNodeId = localId + 1;
+Set voters = Utils.mkSet(localId, otherNodeId);
+int epoch = 2;
+
+List appendRecords = Arrays.asList("a", "b", "c");
+OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.appendToLog(epoch, appendRecords)
+.withAppendLingerMs(1)
+.build();
+
+context.becomeLeader();
+int currentEpoch = context.currentEpoch();
+
+// When creating snapshot:
+// 1.1 high watermark cannot be empty
+assertEquals(OptionalLong.empty(), context.client.highWatermark());
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+// 1.2 high watermark must larger than or equal to the snapshotId's 
endOffset
+advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);
+assertNotEquals(OptionalLong.empty(), context.client.highWatermark());
+OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2));
+
+// 2 the quorum epoch must larger than or equal to the snapshotId's 
epoch
+OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 
1);
+assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3));
+
+// 3 the snapshotId should be validated against endOffsetForEpoch
+OffsetAndEpoch endOffsetForEpoch = 
context.log.endOffsetForEpoch(currentEpoch);

Review comment:
   Let's use `epoch` instead of `currentEpoch`.

[GitHub] [kafka] ijuma merged pull request #10466: KAFKA-12417: streams `copyDependentLibs` should not copy testRuntime configuration jars

2021-04-28 Thread GitBox


ijuma merged pull request #10466:
URL: https://github.com/apache/kafka/pull/10466


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`

2021-04-28 Thread GitBox


ijuma commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-828682155


   Unrelated failures, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-04-28 Thread GitBox


jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r620232416



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+"size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+}
+int leaderEpoch = quorum().epoch();
+if (snapshotId.epoch > leaderEpoch) {
+throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose epoch is" +
+" larger than the current leader epoch: " + leaderEpoch);
+}

Review comment:
   Yeah, This is not strictly required for correctness. Oh, I see, the 
check in 2280 is checking that the `epoch > current epoch`.
   
   I mistakenly read it as `epoch != current epoch`. If we perform this check 
we are basically saying that the caller of `createSnapshot` needs to catch up 
to the current quorum epoch before it can generate a snapshot. Yes, I think 
`epoch <= quorum epoch` is fine. Let me think about it and I'll update the Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-04-28 Thread GitBox


jolshan commented on pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#issuecomment-828663698


   Thanks @junrao for taking another look  
   > A lot of the complexity is the additional logic for propagating unresolved 
partitions from FetchRequest to FetchSession and the maintenance of unresolved 
partitions within FetchSession.
   
   I agree. This has been in the back of my mind for a while, specifically 
whether all the changes in FetchSession are necessary for such cases. So thanks 
for bringing this up. I think the only thing I was really concerned about was 
during a roll to upgrade/the new topic case you mentioned. But even using the 
current approach, I wasn't sure if the fetch session stuff was really helping. 
I think my biggest confusion comes from when the client will refresh metadata. 
Will returning a top level error guarantee a refresh? (vs  given an unknown 
topic ID response?) I think that the top level approach will likely be better. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan opened a new pull request #10605: KAFKA-12726 prevent a stuck Task.stop() from blocking subsequent Task.stops()s

2021-04-28 Thread GitBox


ryannedolan opened a new pull request #10605:
URL: https://github.com/apache/kafka/pull/10605


   A misbehaving Task.stop() can prevent other Tasks from stopping, even when a 
graceful shutdown timeout is configured. We improve the situation as follows:
   
   - prior to task.shutdown.graceful.timeout.ms expiring, the existing behavior 
is retained, except that Task.stop() is called in a new Thread.
   - after task.shutdown.graceful.timeout.ms expires, the Worker runs any 
remaining Task.stop()s concurrently.
   
   Thus, the behavior doesn't change appreciably when Tasks behave normally; 
however, if any Task.stop() get stuck (e.g. in a retry loop) we continue with a 
best-effort shutdown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-28 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r622378516



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1859,15 +1819,14 @@ private void appendBatch(
 offsetAndEpoch.offset + 1, Integer.MAX_VALUE);
 
 future.whenComplete((commitTimeMs, exception) -> {
-int numRecords = batch.records.size();
 if (exception != null) {
-logger.debug("Failed to commit {} records at {}", 
numRecords, offsetAndEpoch, exception);
+logger.debug("Failed to commit {} records at {}", 
batch.numRecords, offsetAndEpoch, exception);
 } else {
 long elapsedTime = Math.max(0, commitTimeMs - 
appendTimeMs);
-double elapsedTimePerRecord = (double) elapsedTime / 
numRecords;
+double elapsedTimePerRecord = (double) elapsedTime / 
batch.numRecords;
 kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, 
appendTimeMs);
-logger.debug("Completed commit of {} records at {}", 
numRecords, offsetAndEpoch);
-maybeFireHandleCommit(batch.baseOffset, epoch, 
batch.records);
+logger.debug("Completed commit of {} records at {}", 
batch.numRecords, offsetAndEpoch);
+maybeFireHandleCommit(batch.baseOffset, epoch, 
batch.records.get());

Review comment:
   We still need to protect the call to `get()` here, right?
   ```java
   batch.records.ifPresent(records -> maybeFireHandleCommit(batch.baseOffset, 
epoch, records));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-04-28 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r617932245



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -662,11 +662,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 val versionId = request.header.apiVersion
 val clientId = request.header.clientId
 val fetchRequest = request.body[FetchRequest]
+val (topicIds, topicNames) =
+  if (fetchRequest.version() >= 13)
+metadataCache.topicIdInfo()
+  else
+(Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, 
String]())
+
 val fetchContext = fetchManager.newContext(
+  fetchRequest.version,
   fetchRequest.metadata,
-  fetchRequest.fetchData,
-  fetchRequest.toForget,
-  fetchRequest.isFromFollower)
+  fetchRequest.isFromFollower,
+  fetchRequest.fetchDataAndError(topicNames),
+  fetchRequest.forgottenTopics(topicNames),
+  topicNames,
+  topicIds)
+

Review comment:
   extra newline.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -110,67 +111,164 @@ public String toString() {
 }
 }
 
-private Optional optionalEpoch(int rawEpochValue) {
+private static Optional optionalEpoch(int rawEpochValue) {
 if (rawEpochValue < 0) {
 return Optional.empty();
 } else {
 return Optional.of(rawEpochValue);
 }
 }
 
+// Only used when version is lower than 13.
 private Map 
toPartitionDataMap(List fetchableTopics) {
 Map result = new LinkedHashMap<>();
 fetchableTopics.forEach(fetchTopic -> 
fetchTopic.partitions().forEach(fetchPartition -> {
 result.put(new TopicPartition(fetchTopic.topic(), 
fetchPartition.partition()),
-new PartitionData(
-fetchPartition.fetchOffset(),
-fetchPartition.logStartOffset(),
-fetchPartition.partitionMaxBytes(),
-optionalEpoch(fetchPartition.currentLeaderEpoch()),
-optionalEpoch(fetchPartition.lastFetchedEpoch())
-));
+new PartitionData(
+fetchPartition.fetchOffset(),
+fetchPartition.logStartOffset(),
+fetchPartition.partitionMaxBytes(),
+optionalEpoch(fetchPartition.currentLeaderEpoch()),
+optionalEpoch(fetchPartition.lastFetchedEpoch())
+));
 }));
 return Collections.unmodifiableMap(result);
 }
 
-private List 
toForgottenTopicList(List forgottenTopics) {
-List result = new ArrayList<>();
-forgottenTopics.forEach(forgottenTopic ->
-forgottenTopic.partitions().forEach(partitionId ->
-result.add(new TopicPartition(forgottenTopic.topic(), 
partitionId))
-)
-);
-return result;
+/**
+ *  The following methods are new to version 13. They support sending 
Fetch requests using topic ID rather
+ *  than topic name. Since the sender and receiver of the fetch request 
may have different topic IDs in
+ *  their caches, there is a possibility for some topic IDs to be 
unresolved on the receiving end. These
+ *  methods and classes try to resolve the topic IDs and keep track of 
unresolved partitions and their errors.
+ */
+
+// Holds information on partitions whose topic IDs were unable to be 
resolved when the Fetch request
+// was received.
+public static final class UnresolvedPartitions {
+private final Uuid id;
+private final Map partitionData;
+
+public UnresolvedPartitions(Uuid id, Map 
partitionData) {
+this.id = id;
+this.partitionData = partitionData;
+}
+
+public Uuid id() {

Review comment:
   id => topicId ?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -110,67 +111,164 @@ public String toString() {
 }
 }
 
-private Optional optionalEpoch(int rawEpochValue) {
+private static Optional optionalEpoch(int rawEpochValue) {
 if (rawEpochValue < 0) {
 return Optional.empty();
 } else {
 return Optional.of(rawEpochValue);
 }
 }
 
+// Only used when version is lower than 13.
 private Map 
toPartitionDataMap(List fetchableTopics) {
 Map result = new LinkedHashMap<>();
 fetchableTopics.forEach(fetchTopic -> 
fetchTopic.partitions().forEach(fetchPartition -> {
 result.put(new TopicPartition(fetchTopic.topic(), 
fetchPartition.partition()),
-new PartitionData(
-fetchPartition.fetchOffset(),
-fetchPartition.logStartOffset(),
-fetchPartit

[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-28 Thread GitBox


JoelWee commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-828616051


   Thanks @ableegoldman! :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-28 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622360387



##
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+val topic0 = "topicWithoutCompaction"
+val topic1 = "topicWithCompaction"
+val t0p0 = new TopicPartition(topic0, 0)
+val t1p0 = new TopicPartition(topic1, 0)
+val t1p1 = new TopicPartition(topic1, 1)
+val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+val producerProps = new Properties()
+// Each batch will hold about 10 records
+producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+val producer = createProducer(configOverrides = producerProps)
+// First topic will not have compaction. Simply a sanity test.
+createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+
+
+// Second topic will have compaction. 
+// The first partition will have compaction occur at offset 0 so 
beginningOffsets should be nonzero.
+// The second partition will not have compaction occur at offset 0, so 
beginningOffsets will remain 0.
+val props = new Properties()
+props.setProperty(LogConfig.MaxCompactionLagMsProp, "1")
+props.setProperty(LogConfig.CleanupPolicyProp, "compact")
+props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01")
+props.setProperty(LogConfig.SegmentBytesProp, "512")
+
+// Send records to first partition -- all duplicates.
+createTopic(topic1, numPartitions = 2, replicationFactor = 1, props)
+TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 
0), "key")
+
+// Send records fo second partition -- only last 50 records are duplicates.
+sendRecords(producer, 50, t1p1)
+TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 
1), "key")
+
+// Sleep to allow compaction to take place.
+Thread.sleep(25000)

Review comment:
   I was able to get it to 15 seconds consistently (50 passed tests) but 
once we get to 13, it starts flaking. Not sure if this is good enough to keep 
the test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-28 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622355520



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##
@@ -540,13 +540,34 @@ class LogCleanerManagerTest extends Logging {
 
 while(log.numberOfSegments < 8)
   log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+log.updateHighWatermark(log.activeSegment.baseOffset)
 
 val lastCleanOffset = Some(0L)
 val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
 assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
 assertEquals(log.activeSegment.baseOffset, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset 
begins with the active segment.")
   }
 
+  @Test
+  def testCleanableOffsetsForNoneWithLowerHighWatermark(): Unit = {

Review comment:
   I think the None may have been taken from the test above. In that test, 
it was referring to "no minimum compaction lag settings active". Changed name 
and added a comment to describe what the test is doing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10598: MINOR: rename wrong topic id variable name and description

2021-04-28 Thread GitBox


chia7712 commented on pull request #10598:
URL: https://github.com/apache/kafka/pull/10598#issuecomment-828584408


   @showuon Could you share your email to me? GitHub profiler does not show 
your email. I can see an email from your public repo 
(https://github.com/showuon/js2ts/commit/d7711b632e681a10655f138f852bb75f1e902288.patch).
 not sure whether it is correct :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12726) misbehaving Task.stop() can prevent other Tasks from stopping

2021-04-28 Thread Ryanne Dolan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryanne Dolan reassigned KAFKA-12726:


Assignee: Ryanne Dolan

> misbehaving Task.stop() can prevent other Tasks from stopping
> -
>
> Key: KAFKA-12726
> URL: https://issues.apache.org/jira/browse/KAFKA-12726
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> We've observed a misbehaving Task fail to stop in a timely manner (e.g. stuck 
> in a retry loop). Despite Connect supporting a property 
> task.shutdown.graceful.timeout.ms, this is currently not enforced -- tasks 
> can take as long as they want to stop, and the only consequence is an error 
> message.
> Unfortunately, Workers stop Tasks sequentially, meaning that a stuck Task can 
> prevent any further Tasks from stopping. Moreover, after a rebalance, these 
> lingering tasks can persist along with their replacements. For example, we've 
> seen a Worker's "task-count" metric double following a rebalance.
> While the Connector implementation is ultimately to blame here -- a Task 
> probably shouldn't loop forever in stop() -- we believe the Connect runtime 
> should handle this situation more gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12726) misbehaving Task.stop() can prevent other Tasks from stopping

2021-04-28 Thread Ryanne Dolan (Jira)
Ryanne Dolan created KAFKA-12726:


 Summary: misbehaving Task.stop() can prevent other Tasks from 
stopping
 Key: KAFKA-12726
 URL: https://issues.apache.org/jira/browse/KAFKA-12726
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.8.0
Reporter: Ryanne Dolan


We've observed a misbehaving Task fail to stop in a timely manner (e.g. stuck 
in a retry loop). Despite Connect supporting a property 
task.shutdown.graceful.timeout.ms, this is currently not enforced -- tasks can 
take as long as they want to stop, and the only consequence is an error message.

Unfortunately, Workers stop Tasks sequentially, meaning that a stuck Task can 
prevent any further Tasks from stopping. Moreover, after a rebalance, these 
lingering tasks can persist along with their replacements. For example, we've 
seen a Worker's "task-count" metric double following a rebalance.

While the Connector implementation is ultimately to blame here -- a Task 
probably shouldn't loop forever in stop() -- we believe the Connect runtime 
should handle this situation more gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >