[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622780715 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,127 +159,181 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", +partitionsPerTopic, consumerToOwnedPartitions); +} Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp : ownedPartitions) { -if (i < maxQuota) { -consumerAssignment.add(tp); -unassignedPartitions.remove(tp); -} else { -allRevokedPartitions.add(tp); -} -++i; -} if (ownedPartitions.size() < minQuota) { +// the expected assignment size is more than consumer have now, so keep all the owned partitions +// and put this member into unfilled member list +if (ownedPartitions.size() > 0) { +consumerAssignment.addAll(ownedPartitions); +assignedPartitions.addAll(ownedPartitions); +} unfilledMembers.add(consumer); +} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers < numExpectedMaxCapacityMembers) { +// consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members +// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions +numMaxCapacityMembers++; +List maxQuotaPartitions = ownedPartitions.subList(0, maxQuota); +consumerAssignment.addAll(maxQuotaPartitions); +assignedPartitions.addAll(maxQuotaPartitions); +allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { -// It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota -if (consumerAssignment.size() == minQuota) -minCapacityMembers.add(consumer); -if (consumerAssi
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622779994 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); +} + +List sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = sortedAllPartitions.size(); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp : ownedPartitions) { -if (i < maxQuota) { -consumerAssignment.add(tp); -unassignedPartitions.remove(tp); -} else { -allRevokedPartitions.add(tp); -} -++i; -} if (ownedPartitions.size() < minQuota) { +// the expected assignment size is more than consumer have now, so keep all the owned partitions +// and put this member into unfilled member list +if (ownedPartitions.size() > 0) { +consumerAssignment.addAll(ownedPartitions); +toBeRemovedPartitions.addAll(ownedPartitions); +} unfilledMembers.add(consumer); +} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { +// consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members +// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions +consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); +toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); +allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { -// It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota -if (consumerAssignment.size() == minQuota) -minCapacityMembers.add(consumer); -if (consumerAssignment.size() == m
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r622767920 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform { Review comment: `RemoteLogSegmentMetadata` is a public API and we do not want to add any specific serialization implementation details into that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r622767150 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.metadata.AbstractApiMessageSerde; +import java.nio.ByteBuffer; + +/** + * This class class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa.. This can be used as serialization protocol for any + * metadata records derived of {@code ApiMessage}s. It internally uses {@link AbstractApiMessageSerde} for serialization/deserialization + * mechanism. + * + * Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective + * {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective + * {@code ApiMessage} instance. + */ +public abstract class BytesApiMessageSerde { Review comment: `BytesApiMessageSerde ` is a generic implementation that can be used by others. As I said in my earlier [comment](https://github.com/apache/kafka/pull/10271#issuecomment-826991751) it will be moved to `clients` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r622766875 ## File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java ## @@ -270,9 +247,9 @@ public String toString() { "remoteLogSegmentId=" + remoteLogSegmentId + ", startOffset=" + startOffset + ", endOffset=" + endOffset + - ", brokerId=" + brokerId + + ", brokerId=" + brokerId() + Review comment: afaik, javac(JDK8+) already converts that into `StringBuilder`.and no need to put `StringBuilder` concatenation code here. Actually, we should avoid doing that as javac has better optimizations like [JEP-280](https://openjdk.java.net/jeps/280). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r622765854 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { +private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); +private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + +private final Map remoteLogStorageClassToApiKey; +private final Map keyToTransform; +private final BytesApiMessageSerde bytesApiMessageSerde; + +public RemoteLogMetadataSerde() { +remoteLogStorageClassToApiKey = createRemoteLogStorageClassToApiKeyMap(); +keyToTransform = createRemoteLogMetadataTransforms(); +bytesApiMessageSerde = new BytesApiMessageSerde() { +@Override +public ApiMessage apiMessageFor(short apiKey) { +return newApiMessage(apiKey); +} +}; +} + +protected ApiMessage newApiMessage(short apiKey) { +return MetadataRecordType.fromId(apiKey).newMetadataRecord(); +} + +protected Map createRemoteLogMetadataTransforms() { +Map map = new HashMap<>(); +map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform()); +map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform()); +map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform()); +return map; +} + +protected Map createRemoteLogStorageClassToApiKeyMap() { +Map map = new HashMap<>(); +map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); +map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); +map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); +return map; +} + +public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { +Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); Review comment: `apiKey` is an internal serialization detail of one of the implementations. We should not leak that to public API like `RemoteLogMetadata` and its subclasses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #10562: MINOR: Update tests to include the 2.8.0 release
kamalcph commented on pull request #10562: URL: https://github.com/apache/kafka/pull/10562#issuecomment-828958926 I didn't notice this PR. Opened #10602 for this. BTW, why the Kafka distro with scala v2.12 is chosen? The recommended version on the doc page is v2.13. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a change in pull request #10605: KAFKA-12726 prevent a stuck Task.stop() from blocking subsequent Task.stops()s
kamalcph commented on a change in pull request #10605: URL: https://github.com/apache/kafka/pull/10605#discussion_r622746344 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) { ClassLoader savedLoader = plugins.currentThreadLoader(); try { savedLoader = Plugins.compareAndSwapLoaders(task.loader()); -task.stop(); +CountDownLatch latch = new CountDownLatch(1); +new Thread() { +@Override +public void run() { +task.stop(); +latch.countDown(); +} +}.start(); +// Wait for thread to terminate, but not longer than timeout. +if (timeout <= 0) { Review comment: nit: This if statement is not required. The countdown latch won't wait at all if the timeout is <= 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a change in pull request #10589: MINOR: move topic configuration defaults
kamalcph commented on a change in pull request #10589: URL: https://github.com/apache/kafka/pull/10589#discussion_r622745172 ## File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java ## @@ -56,26 +66,30 @@ "be overridden on a per-topic basis (see the per-topic configuration section)."; public static final String FLUSH_MS_CONFIG = "flush.ms"; +public static final long FLUSH_MS_DEFAULT = Long.MAX_VALUE; public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " + "force an fsync of data written to the log. For example if this was set to 1000 " + "we would fsync after 1000 ms had passed. In general we recommend you not set " + "this and use replication for durability and allow the operating system's background " + "flush capabilities as it is more efficient."; public static final String RETENTION_BYTES_CONFIG = "retention.bytes"; +public static final long RETENTION_BYTES_DEFAULT = -1L; public static final String RETENTION_BYTES_DOC = "This configuration controls the maximum size a partition " + "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + "the topic retention in bytes."; public static final String RETENTION_MS_CONFIG = "retention.ms"; +public static final long RETENTION_MS_DEFAULT = TimeUnit.DAYS.toMillis(7); public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " + "log before we will discard old log segments to free up space if we are using the " + "\"delete\" retention policy. This represents an SLA on how soon consumers must read " + "their data. If set to -1, no time limit is applied."; public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes"; +public static final int MAX_MESSAGE_BYTES_DEFAULT = 1024 * 1024 + 12; Review comment: Why not referring to Records.LOG_OVERHEAD instead of 12? ## File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java ## @@ -166,19 +191,36 @@ "they will receive messages with a format that they don't understand."; public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type"; +public static final String MESSAGE_TIMESTAMP_TYPE_DEFAULT = "CreateTime"; public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " + "message create time or log append time. The value should be either `CreateTime` or `LogAppendTime`"; public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = "message.timestamp.difference.max.ms"; +public static final long MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT = Long.MAX_VALUE; public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "The maximum difference allowed between " + "the timestamp when a broker receives a message and the timestamp specified in the message. If " + "message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " + "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable"; +public static final boolean MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT = true; public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " + "down-conversion of message formats is enabled to satisfy consume requests. When set to false, " + "broker will not perform down-conversion for consumers expecting an older message format. The broker responds " + "with UNSUPPORTED_VERSION error for consume requests from such older clients. This configuration" + "does not apply to any message format conversion that might be required for replication to followers."; + +public static final String LEADER_REPLICATION_THROTTLED_REPLICAS = "leader.replication.throttled.replicas"; +public static final Collection LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT = Collections.emptyList(); +public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_DOC = "A list of replicas for which log " + +"replication should be throttled on the leader side. The list should describe a set of replicas in the form " + +"[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " + +"all replicas for this topic."; + +
[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
kamalcph commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r622727068 ## File path: tests/docker/Dockerfile ## @@ -62,6 +62,7 @@ RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFK RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.1" RUN mkdir -p "/opt/kafka-2.6.2" && chmod a+rw /opt/kafka-2.6.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.6.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.6.2" RUN mkdir -p "/opt/kafka-2.7.0" && chmod a+rw /opt/kafka-2.7.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.7.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.7.0" +RUN mkdir -p "/opt/kafka-2.8.0" && chmod a+rw /opt/kafka-2.8.0 && curl -s "$KAFKA_MIRROR/kafka_2.13-2.8.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.8.0" Review comment: Distribution with scala v2.13 is recommended in the documentation page so it's chosen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
kamalcph commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r622727068 ## File path: tests/docker/Dockerfile ## @@ -62,6 +62,7 @@ RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFK RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.1" RUN mkdir -p "/opt/kafka-2.6.2" && chmod a+rw /opt/kafka-2.6.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.6.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.6.2" RUN mkdir -p "/opt/kafka-2.7.0" && chmod a+rw /opt/kafka-2.7.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.7.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.7.0" +RUN mkdir -p "/opt/kafka-2.8.0" && chmod a+rw /opt/kafka-2.8.0 && curl -s "$KAFKA_MIRROR/kafka_2.13-2.8.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.8.0" Review comment: Distribution with scala version v2.13 is recommended in the documentation so it's chosen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10535: MINOR: Remove duplicate method in test classes
chia7712 commented on pull request #10535: URL: https://github.com/apache/kafka/pull/10535#issuecomment-828933051 @dengziming Could you merge trunk to trigger QA again? There are some flaky fixes which can stabilize the QA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
kamalcph commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r622726584 ## File path: streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + +@SuppressWarnings("unchecked") +public static void main(final String[] args) throws Exception { +if (args.length < 1) { +System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); +} +final String propFileName = args[0]; + +final Properties streamsProperties = Utils.loadProps(propFileName); + +System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.7)"); +System.out.println("props=" + streamsProperties); + +final StreamsBuilder builder = new StreamsBuilder(); +final KStream dataStream = builder.stream("data"); +dataStream.process(printProcessorSupplier()); +dataStream.to("echo"); + +final Properties config = new Properties(); +config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); +config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); +config.putAll(streamsProperties); + +final KafkaStreams streams = new KafkaStreams(builder.build(), config); +streams.start(); + +Runtime.getRuntime().addShutdownHook(new Thread(() -> { +streams.close(); +System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); +System.out.flush(); +})); +} + +private static ProcessorSupplier printProcessorSupplier() { +return () -> new AbstractProcessor() { +private int numRecordsProcessed = 0; + +@Override +public void init(final ProcessorContext context) { +System.out.println("[2.6] initializing processor: topic=data taskId=" + context.taskId()); Review comment: Updated the version to 2.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
kamalcph commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r622726486 ## File path: streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + +@SuppressWarnings("unchecked") +public static void main(final String[] args) throws Exception { +if (args.length < 1) { +System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); +} +final String propFileName = args[0]; + +final Properties streamsProperties = Utils.loadProps(propFileName); + +System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.7)"); Review comment: Ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory
ableegoldman commented on pull request #10608: URL: https://github.com/apache/kafka/pull/10608#issuecomment-828927542 call for review any of @wcarlson5 @lct45 @cadonna @guozhangwang @mjsax @vvcephei @rodesai -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory
ableegoldman commented on a change in pull request #10608: URL: https://github.com/apache/kafka/pull/10608#discussion_r622719786 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -431,7 +397,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); -Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); Review comment: This change seems innocuous but has a pretty sweet side effect: because we had to exclude the lock file during the deletion (due to some stupid Windows bug 🙄 ), this utility method was not actually deleting the task directory itself. AFAICT this means we were never deleting these empty directories at any point, unless the user manually called KafkaStreams#cleanUp. It's not a horrible bug, but it's just obnoxious so I'm glad we can finally be rid of the Windows ball & chain that was its `DirectoryNotEmptyException` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory
ableegoldman commented on a change in pull request #10608: URL: https://github.com/apache/kafka/pull/10608#discussion_r622718411 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ## @@ -315,7 +314,7 @@ public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusClean } @Test -public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() { +public void shouldCleanupObsoleteTaskDirectoriesAndDeleteTheDirectoryItself() { Review comment: This test was originally added when we [fixed a bug ](https://github.com/apache/kafka/pull/9373/files#diff-beec0c6a88443677858de59258cf144238b4cbb352fe02a7375c0bbba929696aR311)where Streams was continuously attempting to clean "empty" task directories and logging the `"Deleting obsolete state directory..."` message. The root cause of this was that we couldn't delete the task directory itself due to the lock file, so we just left empty directories behind. This PR fixes that, so I adapted the PR to test this new functionality since the old test wouldn't really make sense any more -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory
ableegoldman commented on a change in pull request #10608: URL: https://github.com/apache/kafka/pull/10608#discussion_r622718411 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ## @@ -315,7 +314,7 @@ public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusClean } @Test -public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() { +public void shouldCleanupObsoleteTaskDirectoriesAndDeleteTheDirectoryItself() { Review comment: This test was originally added when we fixed a bug where Streams was continuously attempting to clean "empty" task directories and logging the `"Deleting obsolete state directory..."` message. The root cause of this was that we couldn't delete the task directory itself due to the lock file, so we just left empty directories behind. This PR fixes that, so I adapted the PR to test this new functionality since the old test wouldn't really make sense any more -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6655) CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows OS)
[ https://issues.apache.org/jira/browse/KAFKA-6655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-6655. --- Resolution: Fixed This should be fixed, a number of improvements were made around the locking a few versions ago but we actually took it a step further and removing this kind of file-based locking for task directories entirely as of 3.0 > CleanupThread: Failed to lock the state directory due to an unexpected > exception (Windows OS) > - > > Key: KAFKA-6655 > URL: https://issues.apache.org/jira/browse/KAFKA-6655 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 > Environment: Windows Operating System >Reporter: Srini >Priority: Major > > This issue happens on Windows OS. It code works fine on Linux. This ticket is > related to KAFKA-6647, that also reports locking issues on Windows. However, > there, the issue occurs if users calls KafkaStreams#cleanUp() explicitly, > while this ticket is related to KafkaStreams background CleanupThread (note, > that both use `StateDirectory.cleanRemovedTasks`, but behavior is still > slightly different as different parameters are passed into the method). > {quote}[CleanupThread] Failed to lock the state directory due to an > unexpected exceptionjava.nio.file.DirectoryNotEmptyException: > \tmp\kafka-streams\srini-20171208\0_9 > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636) > at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2688) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at > org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245) > at org.apache.kafka.streams.KafkaStreams$3.run(KafkaStreams.java:761) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10598: MINOR: rename wrong topic id variable name and description
chia7712 commented on pull request #10598: URL: https://github.com/apache/kafka/pull/10598#issuecomment-828919194 @dengziming thanks for your contribution. @showuon thanks for updating profile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10598: MINOR: rename wrong topic id variable name and description
chia7712 merged pull request #10598: URL: https://github.com/apache/kafka/pull/10598 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10608: MINOR: clean up some remaining locking stuff in StateDirectory
ableegoldman opened a new pull request #10608: URL: https://github.com/apache/kafka/pull/10608 Minor followup to https://github.com/apache/kafka/pull/10342 that I noticed while working on the NamedTopology stuff. Cleans up a few things: 1. We no longer need locking for the global state directory either, since it's contained within the top-level state directory lock 2. Clears out misc. usages of the LOCK_FILE_NAME that no longer apply 3. Lazily delete old-and-now-unused lock files in the `StateDirectory#taskDirIsEmpty` method to clean up the state directory for applications that upgraded from an older version that still used task locking -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Description: kafka version. 2.12-2.1.1 after zk flash disconnected then broker6 become the controller leader but the other brokers can't connect to the broker6 until we restart the broker6 then the cluster become recover. bandwidth is normal !image-2021-04-29-11-33-24-303.png! !image-2021-04-29-11-33-53-048.png! !image-2021-04-29-11-34-18-654.png! !image-2021-04-29-11-35-26-994.png! !image-2021-04-29-11-35-41-393.png! !image-2021-04-29-11-35-52-191.png! {code:java} //代码占位符 Connection to 6 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} was: kafka version. 2.12-2.1.1 after zk flash disconnected then broker6 become the controller leader but the other broker can't connect to the broker6 until we restart the broker6 then the cluster become recover. bandwidth is normal !image-2021-04-29-11-33-24-303.png! !image-2021-04-29-11-33-53-048.png! !image-2021-04-29-11-34-18-654.png! !image-2021-04-29-11-35-26-994.png! !image-2021-04-29-11-35-41-393.png! !image-2021-04-29-11-35-52-191.png! {code:java} //代码占位符 Connection to 6 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, > image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, > image-2021-04-29-11-35-52-191.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other brokers can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-33-24-303.png! > !image-2021-04-29-11-33-53-048.png! > !image-2021-04-29-11-34-18-654.png! > !image-2021-04-29-11-35-26-994.png! > !image-2021-04-29-11-35-41-393.png! > !image-2021-04-29-11-35-52-191.png! > > > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherTh
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Attachment: image-2021-04-29-11-35-26-994.png > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, > image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, > image-2021-04-29-11-35-52-191.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other broker can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-24-22-704.png! > !image-2021-04-29-11-27-12-924.png! > !image-2021-04-29-11-27-35-679.png! > !image-2021-04-29-11-25-41-208.png! > !image-2021-04-29-11-26-34-894.png! > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Attachment: image-2021-04-29-11-35-41-393.png > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, > image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, > image-2021-04-29-11-35-52-191.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other broker can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-24-22-704.png! > !image-2021-04-29-11-27-12-924.png! > !image-2021-04-29-11-27-35-679.png! > !image-2021-04-29-11-25-41-208.png! > !image-2021-04-29-11-26-34-894.png! > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Attachment: image-2021-04-29-11-35-52-191.png > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, > image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, > image-2021-04-29-11-35-52-191.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other broker can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-33-24-303.png! > !image-2021-04-29-11-33-53-048.png! > !image-2021-04-29-11-34-18-654.png! > !image-2021-04-29-11-35-26-994.png! > !image-2021-04-29-11-35-41-393.png! > !image-2021-04-29-11-35-52-191.png! > > > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Description: kafka version. 2.12-2.1.1 after zk flash disconnected then broker6 become the controller leader but the other broker can't connect to the broker6 until we restart the broker6 then the cluster become recover. bandwidth is normal !image-2021-04-29-11-33-24-303.png! !image-2021-04-29-11-33-53-048.png! !image-2021-04-29-11-34-18-654.png! !image-2021-04-29-11-35-26-994.png! !image-2021-04-29-11-35-41-393.png! !image-2021-04-29-11-35-52-191.png! {code:java} //代码占位符 Connection to 6 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} was: kafka version. 2.12-2.1.1 after zk flash disconnected then broker6 become the controller leader but the other broker can't connect to the broker6 until we restart the broker6 then the cluster become recover. bandwidth is normal !image-2021-04-29-11-24-22-704.png! !image-2021-04-29-11-27-12-924.png! !image-2021-04-29-11-27-35-679.png! !image-2021-04-29-11-25-41-208.png! !image-2021-04-29-11-26-34-894.png! {code:java} //代码占位符 Connection to 6 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png, > image-2021-04-29-11-35-26-994.png, image-2021-04-29-11-35-41-393.png, > image-2021-04-29-11-35-52-191.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other broker can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-33-24-303.png! > !image-2021-04-29-11-33-53-048.png! > !image-2021-04-29-11-34-18-654.png! > !image-2021-04-29-11-35-26-994.png! > !image-2021-04-29-11-35-41-393.png! > !image-2021-04-29-11-35-52-191.png! > > > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Attachment: image-2021-04-29-11-34-18-654.png > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png, image-2021-04-29-11-34-18-654.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other broker can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-24-22-704.png! > !image-2021-04-29-11-27-12-924.png! > !image-2021-04-29-11-27-35-679.png! > !image-2021-04-29-11-25-41-208.png! > !image-2021-04-29-11-26-34-894.png! > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Attachment: image-2021-04-29-11-33-53-048.png > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other broker can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-24-22-704.png! > !image-2021-04-29-11-27-12-924.png! > !image-2021-04-29-11-27-35-679.png! > !image-2021-04-29-11-25-41-208.png! > !image-2021-04-29-11-26-34-894.png! > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12729) controller leader keep disconnected and block the cluster
[ https://issues.apache.org/jira/browse/KAFKA-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-12729: - Attachment: image-2021-04-29-11-33-24-303.png > controller leader keep disconnected and block the cluster > - > > Key: KAFKA-12729 > URL: https://issues.apache.org/jira/browse/KAFKA-12729 > Project: Kafka > Issue Type: Bug > Components: controller, KafkaConnect >Affects Versions: 2.1.1 >Reporter: luws >Priority: Critical > Attachments: image-2021-04-29-11-33-24-303.png, > image-2021-04-29-11-33-53-048.png > > > kafka version. 2.12-2.1.1 > after zk flash disconnected > then broker6 become the controller leader > but the other broker can't connect to the broker6 until we restart the > broker6 then the cluster become recover. > > bandwidth is normal > !image-2021-04-29-11-24-22-704.png! > !image-2021-04-29-11-27-12-924.png! > !image-2021-04-29-11-27-35-679.png! > !image-2021-04-29-11-25-41-208.png! > !image-2021-04-29-11-26-34-894.png! > > {code:java} > //代码占位符 > Connection to 6 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early
[ https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335118#comment-17335118 ] Bernardo Yusti commented on KAFKA-12718: Hi [~mjsax] , I would like to help out with this issue. This would be my first issue so any pointers on where to start would be greatly appreciated! > SessionWindows are closed too early > --- > > Key: KAFKA-12718 > URL: https://issues.apache.org/jira/browse/KAFKA-12718 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, easy-fix, newbie > Fix For: 3.0.0 > > > SessionWindows are defined based on a {{gap}} parameter, and also support an > additional {{grace-period}} configuration to handle out-of-order data. > To incorporate the session-gap a session window should only be closed at > {{window-end + gap}} and to incorporate grace-period, the close time should > be pushed out further to {{window-end + gap + grace}}. > However, atm we compute the window close time as {{window-end + grace}} > omitting the {{gap}} parameter. > Because default grace-period is 24h most users might not notice this issues. > Even if they set a grace period explicitly (eg, when using suppress()), they > would most likely set a grace-period larger than gap-time not hitting the > issue (or maybe only realize it when inspecting the behavior closely). > However, if a user wants to disable the grace-period and sets it to zero (on > any other value smaller than gap-time), sessions might be close too early and > user might notice. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12729) controller leader keep disconnected and block the cluster
luws created KAFKA-12729: Summary: controller leader keep disconnected and block the cluster Key: KAFKA-12729 URL: https://issues.apache.org/jira/browse/KAFKA-12729 Project: Kafka Issue Type: Bug Components: controller, KafkaConnect Affects Versions: 2.1.1 Reporter: luws kafka version. 2.12-2.1.1 after zk flash disconnected then broker6 become the controller leader but the other broker can't connect to the broker6 until we restart the broker6 then the cluster become recover. bandwidth is normal !image-2021-04-29-11-24-22-704.png! !image-2021-04-29-11-27-12-924.png! !image-2021-04-29-11-27-35-679.png! !image-2021-04-29-11-25-41-208.png! !image-2021-04-29-11-26-34-894.png! {code:java} //代码占位符 Connection to 6 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:274) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
[ https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-7757: Attachment: image-2021-04-29-11-27-12-924.png > Too many open files after java.io.IOException: Connection to n was > disconnected before the response was read > > > Key: KAFKA-7757 > URL: https://issues.apache.org/jira/browse/KAFKA-7757 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Pedro Gontijo >Priority: Major > Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, > fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, > image-2021-04-29-11-25-41-208.png, image-2021-04-29-11-26-34-894.png, > image-2021-04-29-11-27-12-924.png, image-2021-04-29-11-27-35-679.png, > kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt > > > We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers) > After a while (hours) 2 brokers start to throw: > {code:java} > java.io.IOException: Connection to NN was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > File descriptors start to pile up and if I do not restart it throws "Too many > open files" and crashes. > {code:java} > ERROR Error while accepting connection (kafka.network.Acceptor) > java.io.IOException: Too many open files in system > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:460) > at kafka.network.Acceptor.run(SocketServer.scala:403) > at java.lang.Thread.run(Thread.java:748) > {code} > > After some hours the issue happens again... It has happened with all > brokers, so it is not something specific to an instance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
[ https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-7757: Attachment: image-2021-04-29-11-27-35-679.png > Too many open files after java.io.IOException: Connection to n was > disconnected before the response was read > > > Key: KAFKA-7757 > URL: https://issues.apache.org/jira/browse/KAFKA-7757 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Pedro Gontijo >Priority: Major > Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, > fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, > image-2021-04-29-11-25-41-208.png, image-2021-04-29-11-26-34-894.png, > image-2021-04-29-11-27-12-924.png, image-2021-04-29-11-27-35-679.png, > kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt > > > We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers) > After a while (hours) 2 brokers start to throw: > {code:java} > java.io.IOException: Connection to NN was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > File descriptors start to pile up and if I do not restart it throws "Too many > open files" and crashes. > {code:java} > ERROR Error while accepting connection (kafka.network.Acceptor) > java.io.IOException: Too many open files in system > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:460) > at kafka.network.Acceptor.run(SocketServer.scala:403) > at java.lang.Thread.run(Thread.java:748) > {code} > > After some hours the issue happens again... It has happened with all > brokers, so it is not something specific to an instance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
[ https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-7757: Attachment: image-2021-04-29-11-26-34-894.png > Too many open files after java.io.IOException: Connection to n was > disconnected before the response was read > > > Key: KAFKA-7757 > URL: https://issues.apache.org/jira/browse/KAFKA-7757 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Pedro Gontijo >Priority: Major > Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, > fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, > image-2021-04-29-11-25-41-208.png, image-2021-04-29-11-26-34-894.png, > kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt > > > We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers) > After a while (hours) 2 brokers start to throw: > {code:java} > java.io.IOException: Connection to NN was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > File descriptors start to pile up and if I do not restart it throws "Too many > open files" and crashes. > {code:java} > ERROR Error while accepting connection (kafka.network.Acceptor) > java.io.IOException: Too many open files in system > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:460) > at kafka.network.Acceptor.run(SocketServer.scala:403) > at java.lang.Thread.run(Thread.java:748) > {code} > > After some hours the issue happens again... It has happened with all > brokers, so it is not something specific to an instance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
[ https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-7757: Attachment: image-2021-04-29-11-25-41-208.png > Too many open files after java.io.IOException: Connection to n was > disconnected before the response was read > > > Key: KAFKA-7757 > URL: https://issues.apache.org/jira/browse/KAFKA-7757 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Pedro Gontijo >Priority: Major > Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, > fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, > image-2021-04-29-11-25-41-208.png, kafka-allocated-file-handles.png, > server.properties, td1.txt, td2.txt, td3.txt > > > We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers) > After a while (hours) 2 brokers start to throw: > {code:java} > java.io.IOException: Connection to NN was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > File descriptors start to pile up and if I do not restart it throws "Too many > open files" and crashes. > {code:java} > ERROR Error while accepting connection (kafka.network.Acceptor) > java.io.IOException: Too many open files in system > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:460) > at kafka.network.Acceptor.run(SocketServer.scala:403) > at java.lang.Thread.run(Thread.java:748) > {code} > > After some hours the issue happens again... It has happened with all > brokers, so it is not something specific to an instance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
[ https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luws updated KAFKA-7757: Attachment: image-2021-04-29-11-24-22-704.png > Too many open files after java.io.IOException: Connection to n was > disconnected before the response was read > > > Key: KAFKA-7757 > URL: https://issues.apache.org/jira/browse/KAFKA-7757 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Pedro Gontijo >Priority: Major > Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, > fd-spike-threads.txt, image-2021-04-29-11-24-22-704.png, > kafka-allocated-file-handles.png, server.properties, td1.txt, td2.txt, td3.txt > > > We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers) > After a while (hours) 2 brokers start to throw: > {code:java} > java.io.IOException: Connection to NN was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > File descriptors start to pile up and if I do not restart it throws "Too many > open files" and crashes. > {code:java} > ERROR Error while accepting connection (kafka.network.Acceptor) > java.io.IOException: Too many open files in system > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:460) > at kafka.network.Acceptor.run(SocketServer.scala:403) > at java.lang.Thread.run(Thread.java:748) > {code} > > After some hours the issue happens again... It has happened with all > brokers, so it is not something specific to an instance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622707404 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); +} + +List sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = sortedAllPartitions.size(); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp : ownedPartitions) { -if (i < maxQuota) { -consumerAssignment.add(tp); -unassignedPartitions.remove(tp); -} else { -allRevokedPartitions.add(tp); -} -++i; -} if (ownedPartitions.size() < minQuota) { +// the expected assignment size is more than consumer have now, so keep all the owned partitions +// and put this member into unfilled member list +if (ownedPartitions.size() > 0) { +consumerAssignment.addAll(ownedPartitions); +toBeRemovedPartitions.addAll(ownedPartitions); +} unfilledMembers.add(consumer); +} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { +// consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members +// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions +consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); +toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); +allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { -// It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota -if (consumerAssignment.size() == minQuota) -minCapacityMembers.add(consumer); -if (consumerAssignment.size() == m
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622705781 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); +} + +List sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = sortedAllPartitions.size(); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List toBeRemovedPartitions = new ArrayList<>(); Review comment: > And thus are "to be removed" from the list of partitions to be assigned -- is that where the name comes from? Haha, you're right! :) Sorry, I should have made the variable naming better. `assignedPartitions` is great! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622704380 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set allTopics, */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { -SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); +if (log.isDebugEnabled()) { +log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", +partitionsPerTopic, consumerToOwnedPartitions)); +} + +List sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); -// Each consumer should end up in exactly one of the below -// the consumers not yet at capacity +// the consumers not yet at expected capacity List unfilledMembers = new LinkedList<>(); -// the members with exactly maxQuota partitions assigned -Queue maxCapacityMembers = new LinkedList<>(); -// the members with exactly minQuota partitions assigned -Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); -int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); -int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); +int totalPartitionsCount = sortedAllPartitions.size(); + +int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); +int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); +// the expected number of members with maxQuota assignment +int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; +// the number of members with exactly maxQuota partitions assigned +int numMaxCapacityMembers = 0; -// initialize the assignment map with an empty array of size minQuota for all members +// initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota; + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; +List toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); -int i = 0; -// assign the first N partitions up to the max quota, and mark the remaining as being revoked -for (TopicPartition tp : ownedPartitions) { -if (i < maxQuota) { -consumerAssignment.add(tp); -unassignedPartitions.remove(tp); -} else { -allRevokedPartitions.add(tp); -} -++i; -} if (ownedPartitions.size() < minQuota) { +// the expected assignment size is more than consumer have now, so keep all the owned partitions +// and put this member into unfilled member list +if (ownedPartitions.size() > 0) { +consumerAssignment.addAll(ownedPartitions); +toBeRemovedPartitions.addAll(ownedPartitions); +} unfilledMembers.add(consumer); +} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { Review comment: Nice catch! Yes, you're right, the post/-pre-increment stuff is error-prone. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-7572) Producer should not send requests with negative partition id
[ https://issues.apache.org/jira/browse/KAFKA-7572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenhao Ji reassigned KAFKA-7572: Assignee: Wenhao Ji > Producer should not send requests with negative partition id > > > Key: KAFKA-7572 > URL: https://issues.apache.org/jira/browse/KAFKA-7572 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1, 1.1.1 >Reporter: Yaodong Yang >Assignee: Wenhao Ji >Priority: Major > Labels: patch-available > > h3. Issue: > In one Kafka producer log from our users, we found the following weird one: > timestamp="2018-10-09T17:37:41,237-0700",level="ERROR", Message="Write to > Kafka failed with: ",exception="java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topicName--2: 30042 ms has passed since batch creation plus linger time > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for topicName--2: 30042 ms has passed since batch creation plus > linger time" > After a few hours debugging, we finally understood the root cause of this > issue: > # The producer used a buggy custom Partitioner, which sometimes generates > negative partition ids for new records. > # The corresponding produce requests were rejected by brokers, because it's > illegal to have a partition with a negative id. > # The client kept refreshing its local cluster metadata, but could not send > produce requests successfully. > # From the above log, we found a suspicious string "topicName--2": > # According to the source code, the format of this string in the log is > TopicName+"-"+PartitionId. > # It's not easy to notice that there were 2 consecutive dash in the above > log. > # Eventually, we found that the second dash was a negative sign. Therefore, > the partition id is -2, rather than 2. > # The bug the custom Partitioner. > h3. Proposal: > # Producer code should check the partitionId before sending requests to > brokers. > # If there is a negative partition Id, just throw an IllegalStateException{{ > }}exception. > # Such a quick check can save lots of time for people debugging their > producer code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335095#comment-17335095 ] Luke Chen commented on KAFKA-9295: -- _a longer timeout should be used in this case._ Agree! > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
showuon commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622670757 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set allTopics, if (log.isDebugEnabled()) { log.debug("final assignment: " + assignment); } - + return assignment; } -private SortedSet getTopicPartitions(Map partitionsPerTopic) { -SortedSet allPartitions = -new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); -for (Entry entry: partitionsPerTopic.entrySet()) { -String topic = entry.getKey(); -for (int i = 0; i < entry.getValue(); ++i) { +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedToBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ +private List getUnassignedPartitions(List sortedPartitions, Review comment: Thank you very much, @ableegoldman ! I'll address your comments today, thank you :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10598: MINOR: rename wrong topic id variable name and description
showuon commented on pull request #10598: URL: https://github.com/apache/kafka/pull/10598#issuecomment-828879776 @chia7712 , I've updated my github profile. It's `show...@gmail.com`. Haha~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
guozhangwang merged pull request #10462: URL: https://github.com/apache/kafka/pull/10462 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
guozhangwang commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r622657322 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; +boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; +joinFound = true; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Emit expired records before the joined record to keep time ordering +emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp); + context().forward( key, joiner.apply(key, value, otherRecord.value), -To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); +To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); +} + +// Emit all expired records before adding a new non-joined record to the store. Otherwise, +// the put() call will advance the stream time, which causes records out of the retention +// period to be deleted, thus not being emitted later. +if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) { +emitExpiredNonJoinedOuterRecords(); } if (needOuterJoin) { -context().forward(key, joiner.apply(key, value, null)); +// The maxStreamTime contains the max time observed in both sides of the join. +// Having access to the time observed in the other join side fixes the following +// problem: +// +// Say we have a window size of 5 seconds +// 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be emitted, but +// because T2 was late, then it is not fetched by the window store, so it is not processed +// +// See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests +// +// the condition below allows us to process the late record without the need +// to hold it in the temporary outer store +if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) { Review comment: @spena seems there are a few different conditions we can consider here: 1) record time < stream time - window length - grace length: the record is too late, we should drop it up front and also record the `droppedRecordsSensorOrExpiredWindowRecordDropSensor`. 2) record time >= stream time - window length - grace length, but < stream time: the record is still late, but joinable, since the stream time would not be advanced we would not have to check and emit non-joined records, but just try to join this record with the other window. Note that like @mjsax said, for the returned matching record, we also need to check if the other record time >= stream time - window length - grace length or not. 3) record time > stream time, we would first try to emit non-joined records, and then try to join this record. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; +boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTime
[GitHub] [kafka] kowshik commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
kowshik commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r622606093 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform { Review comment: Instead of introducing this separate class, why not have `RemoteLogSegmentMetadata` implement the `RemoteLogMetadataTransform< RemoteLogSegmentMetadata>` interface directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10607: MINOR: bump version to 2.6.3-SNAPSHOT in missing files
ableegoldman merged pull request #10607: URL: https://github.com/apache/kafka/pull/10607 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10607: MINOR: bump version to 2.6.3-SNAPSHOT in missing files
ableegoldman opened a new pull request #10607: URL: https://github.com/apache/kafka/pull/10607 I was just editing something on the release process wiki and noticed that under the "Cut Branch" section, which it says to skip for bugfix releases, there's actually a list of files that it then says to update for both feature and bugfix releases. Luckily most of those files are actually covered later in the wiki, but two were not. Also updated the release process wiki to clear this up for future RMs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on pull request #9944: URL: https://github.com/apache/kafka/pull/9944#issuecomment-828795518 I think the upgrade case is similar---it's rare and transient. So, we could choose to have a simpler and less optimized way for handling it. I am not sure if we trigger metadata refresh for top level error right now. If not, we could probably just add the logic to refresh metadata for all topics on topicId error at the top level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
kowshik commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r621909054 ## File path: raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.metadata; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.protocol.Writable; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.raft.RecordSerde; + +/** + * This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement + * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}. + * + * This can be used as the underlying serialization mechanism for any metadata kind of log storage. Review comment: The grammar could be improved a bit here: `... mechanism for any metadata kind of log storage.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
hachikuji commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r622494008 ## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ## @@ -60,19 +61,21 @@ @Override void close(); -class Batch { +final class Batch implements Iterable { private final long baseOffset; private final int epoch; +private final long lastOffset; private final List records; -public Batch(long baseOffset, int epoch, List records) { +private Batch(long baseOffset, int epoch, long lastOffset, List records) { Review comment: Perhaps it is clear enough already, but maybe we should document that these offsets are inclusive. ## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ## @@ -226,7 +232,11 @@ class TestRaftServer( reader.close() } -case _ => +case HandleSnapshot(reader) => + // Ignore snapshots; only interested on records appended by this leader Review comment: nit: on -> in? ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) { private void updateListenersProgress(List listenerContexts, long highWatermark) { for (ListenerContext listenerContext : listenerContexts) { listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> { -if (nextExpectedOffset < log.startOffset()) { -listenerContext.fireHandleSnapshot(log.startOffset()); +if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) { +SnapshotReader snapshot = latestSnapshot().orElseThrow(() -> { +return new IllegalStateException( +String.format( +"Snapshot expected when next offset is %s, log start offset is %s and high-watermark is %s", Review comment: Perhaps it is useful to mention the class of the listener since we are referring to its next expected offset? ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -426,48 +422,55 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) { public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {} @Override -public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) { -if (logStartOffset() > logStartSnapshotId.offset || -highWatermark.offset < logStartSnapshotId.offset) { - +public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { +if (logStartOffset() > snapshotId.offset) { +throw new OffsetOutOfRangeException( +String.format( +"New log start (%s) is less than the curent log start offset (%s)", +snapshotId, +logStartOffset() +) +); +} +if (highWatermark.offset < snapshotId.offset) { throw new OffsetOutOfRangeException( String.format( -"New log start (%s) is less than start offset (%s) or is greater than the high watermark (%s)", -logStartSnapshotId, -logStartOffset(), +"New log start (%s) is greater than the high watermark (%s)", +snapshotId, highWatermark.offset ) ); } boolean updated = false; -Optional snapshotIdOpt = latestSnapshotId(); -if (snapshotIdOpt.isPresent()) { -OffsetAndEpoch snapshotId = snapshotIdOpt.get(); -if (startOffset() < logStartSnapshotId.offset && -highWatermark.offset >= logStartSnapshotId.offset && -snapshotId.offset >= logStartSnapshotId.offset) { +if (snapshots.containsKey(snapshotId)) { +snapshots.headMap(snapshotId, false).clear(); -snapshots.headMap(logStartSnapshotId, false).clear(); +// Update the high watermark if it is less than the new log start offset +if (snapshotId.offset > highWatermark.offset) { Review comment: Maybe I am missing something, but how could this be possible given the check above? ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List records) { return this; } +Builder withSnapshot(OffsetAndEpoch snapshotId) throws IOException { Review comment: Maybe `withEmptySnapshot`? ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -363,33 +362,30 @@ public LogFetchInfo read(long startOffset, Isolation isolation)
[jira] [Commented] (KAFKA-8613) Set default grace period to 0
[ https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335015#comment-17335015 ] Israel Ekpo commented on KAFKA-8613: I have the initial changes (without the Unit tests yet) here [https://github.com/izzyacademy/kafka/commit/c7094fae666f407e413c2b9d40e980a68aebf81f] Please let me know what you think. I will be working on the Unit tests in about a day and will submit a PR with once the Unit tests are completed. > Set default grace period to 0 > - > > Key: KAFKA-8613 > URL: https://issues.apache.org/jira/browse/KAFKA-8613 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Israel Ekpo >Priority: Blocker > Labels: kip > Fix For: 3.0.0 > > > Currently, the grace period is set to retention time if the grace period is > not specified explicitly. The reason for setting the default grace period to > retention time was backward compatibility. Topologies that were implemented > before the introduction of the grace period, added late arriving records to a > window as long as the window existed, i.e., as long as its retention time was > not elapsed. > This unintuitive default grace period has already caused confusion among > users. > For the next major release, we should set the default grace period to > {{Duration.ZERO}}. > > KIP-633 > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 removed a comment on pull request #10603: WIP: handle missing sink topics in a separate way
wcarlson5 removed a comment on pull request #10603: URL: https://github.com/apache/kafka/pull/10603#issuecomment-828525217 @ableegoldman @rodesai @cadonna as a rough improvement what do you think? I don't think that we need to get this perfect right now but I think that any differentiation of sink topic missing and a task corrupted exception would be good? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 closed pull request #10603: WIP: handle missing sink topics in a separate way
wcarlson5 closed pull request #10603: URL: https://github.com/apache/kafka/pull/10603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #10603: WIP: handle missing sink topics in a separate way
wcarlson5 commented on pull request #10603: URL: https://github.com/apache/kafka/pull/10603#issuecomment-828769426 This will not work without significant other changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-10800: --- Description: When the state machine attempts to create a snapshot writer we should validate that the following is true: # The end offset of the snapshot is less than or equal to the high-watermark. # The epoch of the snapshot is less than or equal to the quorum epoch. # The end offset and epoch of the snapshot is valid based on the leader epoch cache. Note that this validation should not be performed when the raft client creates the snapshot writer because in that case the local log is out of date and the follower should trust the snapshot id sent by the partition leader. was: When the state machine attempts to create a snapshot writer we should validate that the following is true: # The end offset of the snapshot is less than or equal to the high-watermark. # The epoch of the snapshot is equal to the quorum epoch. # The end offset and epoch of the snapshot is valid based on the leader epoch cache. Note that this validation should not be performed when the raft client creates the snapshot writer because in that case the local log is out of date and the follower should trust the snapshot id sent by the partition leader. > Validate the snapshot id when the state machine creates a snapshot > -- > > Key: KAFKA-10800 > URL: https://issues.apache.org/jira/browse/KAFKA-10800 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Haoran Xuan >Priority: Major > > When the state machine attempts to create a snapshot writer we should > validate that the following is true: > # The end offset of the snapshot is less than or equal to the high-watermark. > # The epoch of the snapshot is less than or equal to the quorum epoch. > # The end offset and epoch of the snapshot is valid based on the leader > epoch cache. > Note that this validation should not be performed when the raft client > creates the snapshot writer because in that case the local log is out of date > and the follower should trust the snapshot id sent by the partition leader. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ableegoldman commented on pull request #10573: URL: https://github.com/apache/kafka/pull/10573#issuecomment-828752351 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ableegoldman merged pull request #10573: URL: https://github.com/apache/kafka/pull/10573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ableegoldman commented on pull request #10573: URL: https://github.com/apache/kafka/pull/10573#issuecomment-828751235 Tons of flaky test failures, all unrelated. Mostly Connect and Raft, a few Streams tests that are known to be flaky which I left some thoughts on the ticket for. Going to merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8613) Set default grace period to 0
[ https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334984#comment-17334984 ] Israel Ekpo commented on KAFKA-8613: I noticed that the method signature in the KIP had the incorrect return value. https://cwiki.apache.org/confluence/display/KAFKA/KIP-633:+Drop+24+hour+default+of+grace+period+in+Streams Should I go ahead and update this in the KIP? I just noticed the error during my implementation For the class org.apache.kafka.streams.kstream.SessionWindows {{public}} {{static}} {{*JoinWindows* ofInactivityGapAndGrace(}}{{final}} {{Duration inactivityGap, }}{{final}} {{Duration afterWindowEnd);}} Should have been: {{public}} {{static}} {{*SessionWindows* ofInactivityGapAndGrace(}}{{final}} {{Duration inactivityGap, }}{{final}} {{Duration afterWindowEnd);}} I will update the KIP to reflect what I think it needs to be. Please let me know your thoughts when you see this. Thanks > Set default grace period to 0 > - > > Key: KAFKA-8613 > URL: https://issues.apache.org/jira/browse/KAFKA-8613 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Israel Ekpo >Priority: Blocker > Labels: kip > Fix For: 3.0.0 > > > Currently, the grace period is set to retention time if the grace period is > not specified explicitly. The reason for setting the default grace period to > retention time was backward compatibility. Topologies that were implemented > before the introduction of the grace period, added late arriving records to a > window as long as the window existed, i.e., as long as its retention time was > not elapsed. > This unintuitive default grace period has already caused confusion among > users. > For the next major release, we should set the default grace period to > {{Duration.ZERO}}. > > KIP-633 > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r622505823 ## File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ## @@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(100L, latests.get(t1p0)) } + @Test + def testBeginningOffsetsOnCompactedTopic(): Unit = { +val topic0 = "topicWithoutCompaction" +val topic1 = "topicWithCompaction" +val t0p0 = new TopicPartition(topic0, 0) +val t1p0 = new TopicPartition(topic1, 0) +val t1p1 = new TopicPartition(topic1, 1) +val partitions = Set(t0p0, t1p0, t1p1).asJava + +val producerProps = new Properties() +// Each batch will hold about 10 records +producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256") +val producer = createProducer(configOverrides = producerProps) +// First topic will not have compaction. Simply a sanity test. +createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, recordsPerPartition = 100) + + +// Second topic will have compaction. +// The first partition will have compaction occur at offset 0 so beginningOffsets should be nonzero. +// The second partition will not have compaction occur at offset 0, so beginningOffsets will remain 0. +val props = new Properties() +props.setProperty(LogConfig.MaxCompactionLagMsProp, "1") +props.setProperty(LogConfig.CleanupPolicyProp, "compact") +props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01") +props.setProperty(LogConfig.SegmentBytesProp, "512") + +// Send records to first partition -- all duplicates. +createTopic(topic1, numPartitions = 2, replicationFactor = 1, props) +TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 0), "key") + +// Send records fo second partition -- only last 50 records are duplicates. +sendRecords(producer, 50, t1p1) +TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 1), "key") + +// Sleep to allow compaction to take place. +Thread.sleep(25000) Review comment: Sounds good. On further inspection looks like `log.cleaner.backoff.ms` is 15 seconds, which is why this is happening. I could try to figure out how to change this property for this test, but it may be easier to remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
mjsax commented on pull request #10602: URL: https://github.com/apache/kafka/pull/10602#issuecomment-828736200 \cc @vvcephei who was RM for 2.8 for tracking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
mjsax commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r622492627 ## File path: streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + +@SuppressWarnings("unchecked") +public static void main(final String[] args) throws Exception { +if (args.length < 1) { +System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); +} +final String propFileName = args[0]; + +final Properties streamsProperties = Utils.loadProps(propFileName); + +System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.7)"); +System.out.println("props=" + streamsProperties); + +final StreamsBuilder builder = new StreamsBuilder(); +final KStream dataStream = builder.stream("data"); +dataStream.process(printProcessorSupplier()); +dataStream.to("echo"); + +final Properties config = new Properties(); +config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); +config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); +config.putAll(streamsProperties); + +final KafkaStreams streams = new KafkaStreams(builder.build(), config); +streams.start(); + +Runtime.getRuntime().addShutdownHook(new Thread(() -> { +streams.close(); +System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); +System.out.flush(); +})); +} + +private static ProcessorSupplier printProcessorSupplier() { +return () -> new AbstractProcessor() { +private int numRecordsProcessed = 0; + +@Override +public void init(final ProcessorContext context) { +System.out.println("[2.6] initializing processor: topic=data taskId=" + context.taskId()); Review comment: Should be updated to 2.8 (Seem to be a c&p error) could you fix this in `2.7` and update to `2.7` ? Maybe worth to introduce a variable that encodes the version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
mjsax commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r622492083 ## File path: streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + +@SuppressWarnings("unchecked") +public static void main(final String[] args) throws Exception { +if (args.length < 1) { +System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); +} +final String propFileName = args[0]; + +final Properties streamsProperties = Utils.loadProps(propFileName); + +System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.7)"); Review comment: Should be updated to `2.8` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests
[ https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12724: Component/s: system tests streams > Add 2.8.0 to system tests and streams upgrade tests > --- > > Key: KAFKA-12724 > URL: https://issues.apache.org/jira/browse/KAFKA-12724 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > Kafka v2.8.0 is released. We should add this version to the system tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r622489394 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -67,6 +74,30 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); +this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +} + +public BatchAccumulator accumulator() { +return this.accumulator; +} + +private static List convertToVoters(Set voterIds) { +return voterIds.stream() +.map(follower -> new Voter().setVoterId(follower)) +.collect(Collectors.toList()); +} + +public void appendLeaderChangeMessage(long currentTimeMs) { +List voters = convertToVoters(voterStates.keySet()); +List grantingVoters = convertToVoters(this.grantingVoters()); + +LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() +.setLeaderId(this.election().leaderId()) +.setVoters(voters) +.setGrantingVoters(grantingVoters); + +accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs); +accumulator.forceDrain(); Review comment: Okay. We are attempting to keep the semantic for `appendLeaderChangeMessage` similar to that of the other `append` methods. In that case I am okay with this API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early
[ https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334964#comment-17334964 ] Matthias J. Sax commented on KAFKA-12718: - I was just looking into `suppress()` implementation, that obviously does not know anything about semantics of upstream window definitions. It computes the "expiry-time" based on window-end time. Thus, I believe that the right fix for session windows would be to enforce that gracePeriod > gap. \cc [~ableegoldman] as you proposed to address this inside the session window processor (I think this approach won't work). > SessionWindows are closed too early > --- > > Key: KAFKA-12718 > URL: https://issues.apache.org/jira/browse/KAFKA-12718 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, easy-fix, newbie > Fix For: 3.0.0 > > > SessionWindows are defined based on a {{gap}} parameter, and also support an > additional {{grace-period}} configuration to handle out-of-order data. > To incorporate the session-gap a session window should only be closed at > {{window-end + gap}} and to incorporate grace-period, the close time should > be pushed out further to {{window-end + gap + grace}}. > However, atm we compute the window close time as {{window-end + grace}} > omitting the {{gap}} parameter. > Because default grace-period is 24h most users might not notice this issues. > Even if they set a grace period explicitly (eg, when using suppress()), they > would most likely set a grace-period larger than gap-time not hitting the > issue (or maybe only realize it when inspecting the behavior closely). > However, if a user wants to disable the grace-period and sets it to zero (on > any other value smaller than gap-time), sessions might be close too early and > user might notice. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on pull request #10278: KAFKA-10526: leader fsync deferral on write
jsancio commented on pull request #10278: URL: https://github.com/apache/kafka/pull/10278#issuecomment-828720442 @vamossagar12 Can you add a description to the PR? It is helpful when reviewing this PR to know what strategy you are using to manage the flush offset and log flush. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622474244 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set allTopics, if (log.isDebugEnabled()) { log.debug("final assignment: " + assignment); } - + return assignment; } -private SortedSet getTopicPartitions(Map partitionsPerTopic) { -SortedSet allPartitions = -new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); -for (Entry entry: partitionsPerTopic.entrySet()) { -String topic = entry.getKey(); -for (int i = 0; i < entry.getValue(); ++i) { +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedToBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ +private List getUnassignedPartitions(List sortedPartitions, Review comment: Just let me know when you've responded to my other comments and this is ready for another pass. Nice to see such an improvement along with making this the default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r622473204 ## File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ## @@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(100L, latests.get(t1p0)) } + @Test + def testBeginningOffsetsOnCompactedTopic(): Unit = { +val topic0 = "topicWithoutCompaction" +val topic1 = "topicWithCompaction" +val t0p0 = new TopicPartition(topic0, 0) +val t1p0 = new TopicPartition(topic1, 0) +val t1p1 = new TopicPartition(topic1, 1) +val partitions = Set(t0p0, t1p0, t1p1).asJava + +val producerProps = new Properties() +// Each batch will hold about 10 records +producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256") +val producer = createProducer(configOverrides = producerProps) +// First topic will not have compaction. Simply a sanity test. +createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, recordsPerPartition = 100) + + +// Second topic will have compaction. +// The first partition will have compaction occur at offset 0 so beginningOffsets should be nonzero. +// The second partition will not have compaction occur at offset 0, so beginningOffsets will remain 0. +val props = new Properties() +props.setProperty(LogConfig.MaxCompactionLagMsProp, "1") +props.setProperty(LogConfig.CleanupPolicyProp, "compact") +props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01") +props.setProperty(LogConfig.SegmentBytesProp, "512") + +// Send records to first partition -- all duplicates. +createTopic(topic1, numPartitions = 2, replicationFactor = 1, props) +TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 0), "key") + +// Send records fo second partition -- only last 50 records are duplicates. +sendRecords(producer, 50, t1p1) +TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 1), "key") + +// Sleep to allow compaction to take place. +Thread.sleep(25000) Review comment: I'd rather just remove it if we don't have a good way to keep it from flaking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r622470077 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -67,6 +74,30 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); +this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +} + +public BatchAccumulator accumulator() { +return this.accumulator; +} + +private static List convertToVoters(Set voterIds) { +return voterIds.stream() +.map(follower -> new Voter().setVoterId(follower)) +.collect(Collectors.toList()); +} + +public void appendLeaderChangeMessage(long currentTimeMs) { +List voters = convertToVoters(voterStates.keySet()); +List grantingVoters = convertToVoters(this.grantingVoters()); + +LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() +.setLeaderId(this.election().leaderId()) +.setVoters(voters) +.setGrantingVoters(grantingVoters); + +accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs); +accumulator.forceDrain(); Review comment: It was my suggestion in order to keep the contract of `BatchAccumulator` tight. We do "know" in this usage that the accumulator should be empty, but it costs us very little to protect the API without relying on external assumptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm
ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r622472215 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set allTopics, if (log.isDebugEnabled()) { log.debug("final assignment: " + assignment); } - + return assignment; } -private SortedSet getTopicPartitions(Map partitionsPerTopic) { -SortedSet allPartitions = -new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); -for (Entry entry: partitionsPerTopic.entrySet()) { -String topic = entry.getKey(); -for (int i = 0; i < entry.getValue(); ++i) { +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedToBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ +private List getUnassignedPartitions(List sortedPartitions, Review comment: Thanks for getting some concrete numbers to work with! I suspected the theory would not match the reality due to caching primarily, although I wasn't aware of the improved runtime of sort on a partially-ordered list. That's good to know 😄 And it does make sense in hindsight given the nature of the sorting algorithm. I've always found that the reality of array performance with any reasonable caching architecture, compared to theoretically better data structures/algorithms is one of those things that people know and still subconsciously doubt. Probably because most people spent 4 years in college getting theoretical algorithmic runtimes drilled into their heads, and far less time looking into the underlying architecture that powers those algorithms and applies its own optimizations under the hood. It's an interesting psychological observation. There's a great talk on it somewhere but I can't remember the name Anyways, you just never know until you run the numbers. I'm sure this may vary somewhat with different input parameters but I think I'm convinced, let's stick with this improvement. If someone starts complaining about the memory consumption we can always go back and look for ways to cut down. Thanks for the enlightening discussion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r622470077 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -67,6 +74,30 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); +this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +} + +public BatchAccumulator accumulator() { +return this.accumulator; +} + +private static List convertToVoters(Set voterIds) { +return voterIds.stream() +.map(follower -> new Voter().setVoterId(follower)) +.collect(Collectors.toList()); +} + +public void appendLeaderChangeMessage(long currentTimeMs) { +List voters = convertToVoters(voterStates.keySet()); +List grantingVoters = convertToVoters(this.grantingVoters()); + +LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() +.setLeaderId(this.election().leaderId()) +.setVoters(voters) +.setGrantingVoters(grantingVoters); + +accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs); +accumulator.forceDrain(); Review comment: It was my suggestion in order to keep the contract of `BatchAccumulator` tight. We do "know" in this usage that the accumulator will be empty, but it costs us very little to protect the API without relying on external assumptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 opened a new pull request #10606: KAFKA-12728: [Work In Progress] gradle version upgrade: 6.8 -->> 7.0
dejan2609 opened a new pull request #10606: URL: https://github.com/apache/kafka/pull/10606 **_JIRA ticket:_** https://issues.apache.org/jira/browse/KAFKA-12728 **_Related pull requests:_** - #10203 - #10466 Note: we will wait for a Gradle 7.0.1 patch ◀️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10597: KAFKA-5876: Apply StreamsNotStartedException for Interactive Queries
ableegoldman commented on a change in pull request #10597: URL: https://github.com/apache/kafka/pull/10597#discussion_r622464617 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -344,7 +345,7 @@ private boolean isRunningOrRebalancing() { private void validateIsRunningOrRebalancing() { if (!isRunningOrRebalancing()) { -throw new IllegalStateException("KafkaStreams is not running. State is " + state + "."); +throw new StreamsNotStartedException("KafkaStreams is not running. State is " + state + "."); Review comment: SGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334937#comment-17334937 ] A. Sophie Blee-Goldman commented on KAFKA-9295: --- It's failing on {noformat} startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(60)); {noformat} At this point a session timeout seems unlikely, since [~showuon] observed a Streams instance dropping out on the heartbeat interval only a couple of times even when it failed, and all it has to do here is get to RUNNING once. It doesn't require that all KafkaStreams in the list get to RUNNING and then stay there, so all the instance has to do is start up and go through at least once successful rebalance in that time. There's nothing to restore so the transition to RUNNING should be immediate after the rebalance. Now technically 60s is a typical timeout for startApplicationAndWaitUntilRunning in the Streams integration tests, but the difference between this and other tests is that most have only one or two KafkaStreams to start up whereas this test has three. They're not started up and waited on sequentially so that shouldn't _really_ matter that much, but still it might just be that a longer timeout should be used in this case. I'm open to other theories however Also note that we should soon have a larger default session interval, so once Jason's KIP for that has been implemented we'll be able to get that improvement for free. Even if we think the session interval is the problem with this test, it probably makes sense to just wait for that KIP than to hardcode in some special value. If it starts to fail very frequently we can reconsider, but I haven't observed it doing so since the last fix was merged > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r622455380 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -67,6 +74,30 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); +this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +} + +public BatchAccumulator accumulator() { +return this.accumulator; +} + +private static List convertToVoters(Set voterIds) { +return voterIds.stream() +.map(follower -> new Voter().setVoterId(follower)) +.collect(Collectors.toList()); +} + +public void appendLeaderChangeMessage(long currentTimeMs) { +List voters = convertToVoters(voterStates.keySet()); +List grantingVoters = convertToVoters(this.grantingVoters()); + +LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() +.setLeaderId(this.election().leaderId()) +.setVoters(voters) +.setGrantingVoters(grantingVoters); + +accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs); +accumulator.forceDrain(); Review comment: We `forceDrain` inside `appendLeaderChangeMessage` before adding the leader change message to `completed`. Is this `forceDrain()` a no-op? Should the current batch be empty at this point? If we don't need to call this method, I say we remove it all together. ## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ## @@ -65,6 +66,132 @@ ); } +@Test +public void testLeaderChangeMessageWritten() { +int leaderEpoch = 17; +long baseOffset = 0; +int lingerMs = 50; +int maxBatchSize = 512; + +ByteBuffer buffer = ByteBuffer.allocate(256); +Mockito.when(memoryPool.tryAllocate(256)) +.thenReturn(buffer); + +BatchAccumulator acc = buildAccumulator( +leaderEpoch, +baseOffset, +lingerMs, +maxBatchSize +); + +acc.appendLeaderChangeMessage(new LeaderChangeMessage(), time.milliseconds()); Review comment: Let's check that `needsDrain` returns true after this point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12728) Update Gradle version: 6.8 -->> 7.0
Dejan Stojadinović created KAFKA-12728: -- Summary: Update Gradle version: 6.8 -->> 7.0 Key: KAFKA-12728 URL: https://issues.apache.org/jira/browse/KAFKA-12728 Project: Kafka Issue Type: Improvement Components: build Reporter: Dejan Stojadinović Assignee: Dejan Stojadinović ^_*Prologue/related tickets*:_ KAFKA-12415 _*and*_ KAFKA-12417^ *Gradle 7.0 release notes:* [https://docs.gradle.org/7.0/release-notes.html] *_+Note+_*: it makes sense to wait for a patch *_7.0.1_* to be released: [https://github.com/gradle/gradle/milestone/173] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r622456807 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ## @@ -984,19 +1003,26 @@ class LogCleanerTest { def distinctValuesBySegment = log.logSegments.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq -val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment +val distinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N), "Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.") +log.updateHighWatermark(log.activeSegment.baseOffset) cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset)) val distinctValuesBySegmentAfterClean = distinctValuesBySegment - assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) - .take(numCleanableSegments).forall { case (before, after) => after < before }, +// One segment should have been completely deleted, so there will be fewer segments. +assertTrue(distinctValuesBySegmentAfterClean.size < distinctValuesBySegmentBeforeClean.size) + +// Drop the first segment from before cleaning since it was removed. Also subtract 1 from numCleanableSegments +val normalizedDistinctValuesBySegmentBeforeClean = distinctValuesBySegmentBeforeClean.drop(1) Review comment: This test is a little tricky, but I've updated it. Now it only uses duplicate keys. It's a little confusing because the first uncleanable offset is not actually the point at which records below are cleaned. The segments cleaned are the full segments below the uncleanable offset (so if the segments before the uncleanable offset in this case). And even then, one record (the last record in the last segment) will be retained due to how cleaning works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r622456807 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ## @@ -984,19 +1003,26 @@ class LogCleanerTest { def distinctValuesBySegment = log.logSegments.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq -val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment +val distinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N), "Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.") +log.updateHighWatermark(log.activeSegment.baseOffset) cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset)) val distinctValuesBySegmentAfterClean = distinctValuesBySegment - assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) - .take(numCleanableSegments).forall { case (before, after) => after < before }, +// One segment should have been completely deleted, so there will be fewer segments. +assertTrue(distinctValuesBySegmentAfterClean.size < distinctValuesBySegmentBeforeClean.size) + +// Drop the first segment from before cleaning since it was removed. Also subtract 1 from numCleanableSegments +val normalizedDistinctValuesBySegmentBeforeClean = distinctValuesBySegmentBeforeClean.drop(1) Review comment: This test is a little tricky, but I've updated it. Now it only uses duplicate keys. It's a little confusing because the first uncleanable offset is not actually the point at which records below are cleaned. It the segments cleaned are the full segments below the uncleanable offset. And even then, one record (the last record in the last segment) will be retained due to how cleaning works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
hachikuji commented on pull request #9441: URL: https://github.com/apache/kafka/pull/9441#issuecomment-828703078 @tombentley Thanks for the updates. I made a few small changes in this patch: https://github.com/hachikuji/kafka/commit/27ba937cc7b7da230ccc4f0c8220f680c4a542fe. The main things are taking the loading itself out of `compute` (which is intended to be a cheap operation) and moving the `loadingPartitions` check. Feel free to pull in the changes if they make sense to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-9295: --- Still failing, saw this on both the Java 8 and Java 11 build of a PR: Stacktrace java.lang.AssertionError: Application did not reach a RUNNING state for all streams instances. Non-running instances: {org.apache.kafka.streams.KafkaStreams@cce6a1d=REBALANCING, org.apache.kafka.streams.KafkaStreams@45af6187=REBALANCING, org.apache.kafka.streams.KafkaStreams@5d8ba53=REBALANCING} at org.junit.Assert.fail(Assert.java:89) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:904) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:197) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185) https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10573/8/testReport/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldInnerJoinMultiPartitionQueryable/ > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12727) Flaky test ListConsumerGroupTest.testListConsumerGroupsWithStates
A. Sophie Blee-Goldman created KAFKA-12727: -- Summary: Flaky test ListConsumerGroupTest.testListConsumerGroupsWithStates Key: KAFKA-12727 URL: https://issues.apache.org/jira/browse/KAFKA-12727 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman Stacktrace org.opentest4j.AssertionFailedError: Expected to show groups Set((groupId='simple-group', isSimpleConsumerGroup=true, state=Optional[Empty]), (groupId='test.group', isSimpleConsumerGroup=false, state=Optional[Stable])), but found Set((groupId='test.group', isSimpleConsumerGroup=false, state=Optional[Stable])) at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.admin.ListConsumerGroupTest.testListConsumerGroupsWithStates(ListConsumerGroupTest.scala:66) https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10573/8/testReport/kafka.admin/ListConsumerGroupTest/Build___JDK_11_and_Scala_2_13___testListConsumerGroupsWithStates__/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12417) streams module should use `testRuntimeClasspath` instead of `testRuntime` configuration
[ https://issues.apache.org/jira/browse/KAFKA-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dejan Stojadinović resolved KAFKA-12417. Resolution: Done Resolving as done (commit is merged into trunk). > streams module should use `testRuntimeClasspath` instead of `testRuntime` > configuration > --- > > Key: KAFKA-12417 > URL: https://issues.apache.org/jira/browse/KAFKA-12417 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Dejan Stojadinović >Priority: Major > > The streams module has the following code: > {code:java} > tasks.create(name: "copyDependantLibs", type: Copy) { > from (configurations.testRuntime) { > include('slf4j-log4j12*') > include('log4j*jar') > include('*hamcrest*') > } > from (configurations.runtimeClasspath) { > exclude('kafka-clients*') > } > into "$buildDir/dependant-libs-${versions.scala}" > duplicatesStrategy 'exclude' > } {code} > {{configurations.testRuntime}} should be changed to > {{configurations.testRuntimeClasspath}} as the former has been removed in > Gradle 7.0, but this causes a cyclic build error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr closed pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr closed pull request #10480: URL: https://github.com/apache/kafka/pull/10480 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r622425656 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -1335,6 +1313,57 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } +@Test +public void testCreateSnapshotWithInvalidSnapshotId() throws Exception { +int localId = 0; +int otherNodeId = localId + 1; +Set voters = Utils.mkSet(localId, otherNodeId); +int epoch = 2; + +List appendRecords = Arrays.asList("a", "b", "c"); +OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.appendToLog(epoch, appendRecords) +.withAppendLingerMs(1) +.build(); + +context.becomeLeader(); +int currentEpoch = context.currentEpoch(); + +// When creating snapshot: +// 1.1 high watermark cannot be empty +assertEquals(OptionalLong.empty(), context.client.highWatermark()); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1)); + +// 1.2 high watermark must larger than or equal to the snapshotId's endOffset +advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId); Review comment: I would append a couple of batches after advancing the high-watermark. At this point the HWM equals the LEO. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,25 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < snapshotId.offset) { +throw new KafkaException("Trying to creating snapshot with invalid snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + +highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + +"size of records between the latest snapshot and the high-watermark when creating snapshot"); Review comment: What do you mean by ", since the there should be a minimum size of records between the latest snapshot and the high-watermark when creating snapshot"? ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -1335,6 +1313,57 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } +@Test +public void testCreateSnapshotWithInvalidSnapshotId() throws Exception { +int localId = 0; +int otherNodeId = localId + 1; +Set voters = Utils.mkSet(localId, otherNodeId); +int epoch = 2; + +List appendRecords = Arrays.asList("a", "b", "c"); +OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.appendToLog(epoch, appendRecords) +.withAppendLingerMs(1) +.build(); + +context.becomeLeader(); +int currentEpoch = context.currentEpoch(); + +// When creating snapshot: +// 1.1 high watermark cannot be empty +assertEquals(OptionalLong.empty(), context.client.highWatermark()); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1)); + +// 1.2 high watermark must larger than or equal to the snapshotId's endOffset +advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId); +assertNotEquals(OptionalLong.empty(), context.client.highWatermark()); +OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId2)); + +// 2 the quorum epoch must larger than or equal to the snapshotId's epoch +OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 1); +assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3)); + +// 3 the snapshotId should be validated against endOffsetForEpoch +OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(currentEpoch); Review comment: Let's use `epoch` instead of `currentEpoch`.
[GitHub] [kafka] ijuma merged pull request #10466: KAFKA-12417: streams `copyDependentLibs` should not copy testRuntime configuration jars
ijuma merged pull request #10466: URL: https://github.com/apache/kafka/pull/10466 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration `testRuntime`
ijuma commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-828682155 Unrelated failures, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r620232416 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List records, boolean isAtomic) { ); } +private void validateSnapshotId(OffsetAndEpoch snapshotId) { +Optional highWatermarkOpt = quorum().highWatermark(); +if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + +highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + +"size of records between the latest snapshot and the high-watermark when creating snapshot"); +} +int leaderEpoch = quorum().epoch(); +if (snapshotId.epoch > leaderEpoch) { +throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" + +" larger than the current leader epoch: " + leaderEpoch); +} Review comment: Yeah, This is not strictly required for correctness. Oh, I see, the check in 2280 is checking that the `epoch > current epoch`. I mistakenly read it as `epoch != current epoch`. If we perform this check we are basically saying that the caller of `createSnapshot` needs to catch up to the current quorum epoch before it can generate a snapshot. Yes, I think `epoch <= quorum epoch` is fine. Let me think about it and I'll update the Jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on pull request #9944: URL: https://github.com/apache/kafka/pull/9944#issuecomment-828663698 Thanks @junrao for taking another look > A lot of the complexity is the additional logic for propagating unresolved partitions from FetchRequest to FetchSession and the maintenance of unresolved partitions within FetchSession. I agree. This has been in the back of my mind for a while, specifically whether all the changes in FetchSession are necessary for such cases. So thanks for bringing this up. I think the only thing I was really concerned about was during a roll to upgrade/the new topic case you mentioned. But even using the current approach, I wasn't sure if the fetch session stuff was really helping. I think my biggest confusion comes from when the client will refresh metadata. Will returning a top level error guarantee a refresh? (vs given an unknown topic ID response?) I think that the top level approach will likely be better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan opened a new pull request #10605: KAFKA-12726 prevent a stuck Task.stop() from blocking subsequent Task.stops()s
ryannedolan opened a new pull request #10605: URL: https://github.com/apache/kafka/pull/10605 A misbehaving Task.stop() can prevent other Tasks from stopping, even when a graceful shutdown timeout is configured. We improve the situation as follows: - prior to task.shutdown.graceful.timeout.ms expiring, the existing behavior is retained, except that Task.stop() is called in a new Thread. - after task.shutdown.graceful.timeout.ms expires, the Worker runs any remaining Task.stop()s concurrently. Thus, the behavior doesn't change appreciably when Tasks behave normally; however, if any Task.stop() get stuck (e.g. in a retry loop) we continue with a best-effort shutdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r622378516 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1859,15 +1819,14 @@ private void appendBatch( offsetAndEpoch.offset + 1, Integer.MAX_VALUE); future.whenComplete((commitTimeMs, exception) -> { -int numRecords = batch.records.size(); if (exception != null) { -logger.debug("Failed to commit {} records at {}", numRecords, offsetAndEpoch, exception); +logger.debug("Failed to commit {} records at {}", batch.numRecords, offsetAndEpoch, exception); } else { long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs); -double elapsedTimePerRecord = (double) elapsedTime / numRecords; +double elapsedTimePerRecord = (double) elapsedTime / batch.numRecords; kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs); -logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); -maybeFireHandleCommit(batch.baseOffset, epoch, batch.records); +logger.debug("Completed commit of {} records at {}", batch.numRecords, offsetAndEpoch); +maybeFireHandleCommit(batch.baseOffset, epoch, batch.records.get()); Review comment: We still need to protect the call to `get()` here, right? ```java batch.records.ifPresent(records -> maybeFireHandleCommit(batch.baseOffset, epoch, records)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r617932245 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -662,11 +662,21 @@ class KafkaApis(val requestChannel: RequestChannel, val versionId = request.header.apiVersion val clientId = request.header.clientId val fetchRequest = request.body[FetchRequest] +val (topicIds, topicNames) = + if (fetchRequest.version() >= 13) +metadataCache.topicIdInfo() + else +(Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, String]()) + val fetchContext = fetchManager.newContext( + fetchRequest.version, fetchRequest.metadata, - fetchRequest.fetchData, - fetchRequest.toForget, - fetchRequest.isFromFollower) + fetchRequest.isFromFollower, + fetchRequest.fetchDataAndError(topicNames), + fetchRequest.forgottenTopics(topicNames), + topicNames, + topicIds) + Review comment: extra newline. ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -110,67 +111,164 @@ public String toString() { } } -private Optional optionalEpoch(int rawEpochValue) { +private static Optional optionalEpoch(int rawEpochValue) { if (rawEpochValue < 0) { return Optional.empty(); } else { return Optional.of(rawEpochValue); } } +// Only used when version is lower than 13. private Map toPartitionDataMap(List fetchableTopics) { Map result = new LinkedHashMap<>(); fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> { result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), -new PartitionData( -fetchPartition.fetchOffset(), -fetchPartition.logStartOffset(), -fetchPartition.partitionMaxBytes(), -optionalEpoch(fetchPartition.currentLeaderEpoch()), -optionalEpoch(fetchPartition.lastFetchedEpoch()) -)); +new PartitionData( +fetchPartition.fetchOffset(), +fetchPartition.logStartOffset(), +fetchPartition.partitionMaxBytes(), +optionalEpoch(fetchPartition.currentLeaderEpoch()), +optionalEpoch(fetchPartition.lastFetchedEpoch()) +)); })); return Collections.unmodifiableMap(result); } -private List toForgottenTopicList(List forgottenTopics) { -List result = new ArrayList<>(); -forgottenTopics.forEach(forgottenTopic -> -forgottenTopic.partitions().forEach(partitionId -> -result.add(new TopicPartition(forgottenTopic.topic(), partitionId)) -) -); -return result; +/** + * The following methods are new to version 13. They support sending Fetch requests using topic ID rather + * than topic name. Since the sender and receiver of the fetch request may have different topic IDs in + * their caches, there is a possibility for some topic IDs to be unresolved on the receiving end. These + * methods and classes try to resolve the topic IDs and keep track of unresolved partitions and their errors. + */ + +// Holds information on partitions whose topic IDs were unable to be resolved when the Fetch request +// was received. +public static final class UnresolvedPartitions { +private final Uuid id; +private final Map partitionData; + +public UnresolvedPartitions(Uuid id, Map partitionData) { +this.id = id; +this.partitionData = partitionData; +} + +public Uuid id() { Review comment: id => topicId ? ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -110,67 +111,164 @@ public String toString() { } } -private Optional optionalEpoch(int rawEpochValue) { +private static Optional optionalEpoch(int rawEpochValue) { if (rawEpochValue < 0) { return Optional.empty(); } else { return Optional.of(rawEpochValue); } } +// Only used when version is lower than 13. private Map toPartitionDataMap(List fetchableTopics) { Map result = new LinkedHashMap<>(); fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> { result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), -new PartitionData( -fetchPartition.fetchOffset(), -fetchPartition.logStartOffset(), -fetchPartit
[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
JoelWee commented on pull request #8923: URL: https://github.com/apache/kafka/pull/8923#issuecomment-828616051 Thanks @ableegoldman! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r622360387 ## File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ## @@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(100L, latests.get(t1p0)) } + @Test + def testBeginningOffsetsOnCompactedTopic(): Unit = { +val topic0 = "topicWithoutCompaction" +val topic1 = "topicWithCompaction" +val t0p0 = new TopicPartition(topic0, 0) +val t1p0 = new TopicPartition(topic1, 0) +val t1p1 = new TopicPartition(topic1, 1) +val partitions = Set(t0p0, t1p0, t1p1).asJava + +val producerProps = new Properties() +// Each batch will hold about 10 records +producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256") +val producer = createProducer(configOverrides = producerProps) +// First topic will not have compaction. Simply a sanity test. +createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, recordsPerPartition = 100) + + +// Second topic will have compaction. +// The first partition will have compaction occur at offset 0 so beginningOffsets should be nonzero. +// The second partition will not have compaction occur at offset 0, so beginningOffsets will remain 0. +val props = new Properties() +props.setProperty(LogConfig.MaxCompactionLagMsProp, "1") +props.setProperty(LogConfig.CleanupPolicyProp, "compact") +props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01") +props.setProperty(LogConfig.SegmentBytesProp, "512") + +// Send records to first partition -- all duplicates. +createTopic(topic1, numPartitions = 2, replicationFactor = 1, props) +TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 0), "key") + +// Send records fo second partition -- only last 50 records are duplicates. +sendRecords(producer, 50, t1p1) +TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 1), "key") + +// Sleep to allow compaction to take place. +Thread.sleep(25000) Review comment: I was able to get it to 15 seconds consistently (50 passed tests) but once we get to 13, it starts flaking. Not sure if this is good enough to keep the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r622355520 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ## @@ -540,13 +540,34 @@ class LogCleanerManagerTest extends Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) +log.updateHighWatermark(log.activeSegment.baseOffset) val lastCleanOffset = Some(0L) val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") assertEquals(log.activeSegment.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset begins with the active segment.") } + @Test + def testCleanableOffsetsForNoneWithLowerHighWatermark(): Unit = { Review comment: I think the None may have been taken from the test above. In that test, it was referring to "no minimum compaction lag settings active". Changed name and added a comment to describe what the test is doing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10598: MINOR: rename wrong topic id variable name and description
chia7712 commented on pull request #10598: URL: https://github.com/apache/kafka/pull/10598#issuecomment-828584408 @showuon Could you share your email to me? GitHub profiler does not show your email. I can see an email from your public repo (https://github.com/showuon/js2ts/commit/d7711b632e681a10655f138f852bb75f1e902288.patch). not sure whether it is correct :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12726) misbehaving Task.stop() can prevent other Tasks from stopping
[ https://issues.apache.org/jira/browse/KAFKA-12726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryanne Dolan reassigned KAFKA-12726: Assignee: Ryanne Dolan > misbehaving Task.stop() can prevent other Tasks from stopping > - > > Key: KAFKA-12726 > URL: https://issues.apache.org/jira/browse/KAFKA-12726 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.8.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > We've observed a misbehaving Task fail to stop in a timely manner (e.g. stuck > in a retry loop). Despite Connect supporting a property > task.shutdown.graceful.timeout.ms, this is currently not enforced -- tasks > can take as long as they want to stop, and the only consequence is an error > message. > Unfortunately, Workers stop Tasks sequentially, meaning that a stuck Task can > prevent any further Tasks from stopping. Moreover, after a rebalance, these > lingering tasks can persist along with their replacements. For example, we've > seen a Worker's "task-count" metric double following a rebalance. > While the Connector implementation is ultimately to blame here -- a Task > probably shouldn't loop forever in stop() -- we believe the Connect runtime > should handle this situation more gracefully. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12726) misbehaving Task.stop() can prevent other Tasks from stopping
Ryanne Dolan created KAFKA-12726: Summary: misbehaving Task.stop() can prevent other Tasks from stopping Key: KAFKA-12726 URL: https://issues.apache.org/jira/browse/KAFKA-12726 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.8.0 Reporter: Ryanne Dolan We've observed a misbehaving Task fail to stop in a timely manner (e.g. stuck in a retry loop). Despite Connect supporting a property task.shutdown.graceful.timeout.ms, this is currently not enforced -- tasks can take as long as they want to stop, and the only consequence is an error message. Unfortunately, Workers stop Tasks sequentially, meaning that a stuck Task can prevent any further Tasks from stopping. Moreover, after a rebalance, these lingering tasks can persist along with their replacements. For example, we've seen a Worker's "task-count" metric double following a rebalance. While the Connector implementation is ultimately to blame here -- a Task probably shouldn't loop forever in stop() -- we believe the Connect runtime should handle this situation more gracefully. -- This message was sent by Atlassian Jira (v8.3.4#803005)