[GitHub] [kafka] dajac commented on a change in pull request #10438: KAFKA-12579: Remove various deprecated clients classes/methods for 3.0
dajac commented on a change in pull request #10438: URL: https://github.com/apache/kafka/pull/10438#discussion_r607567743 ## File path: docs/upgrade.html ## @@ -27,18 +27,30 @@ Notable changes in 3 or updating the application not to use internal classes. The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier. For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes. -The deprecated Scala Authorizer, SimpleAclAuthorizer and related classes have been removed. Please use the Java Authorizer -and AclAuthorizer instead. -The deprecated Metric#value() method was removed (https://issues.apache.org/jira/browse/KAFKA-12573";>KAFKA-12573). -Deprecated security classes were removed: PrincipalBuilder, DefaultPrincipalBuilder and ResourceFilter. -Furthermore, deprecated constants and constructors were removed from SslConfigs, SaslConfigs, -AclBinding and AclBindingFilter. -The deprecated Admin.electedPreferredLeaders() methods were removed. Please use Admin.electLeaders instead. -The deprecated kafka-preferred-replica-election command line tool was removed. Please use kafka-leader-election instead. -The deprecated ConfigEntry constructor was removed (https://issues.apache.org/jira/browse/KAFKA-12577";>KAFKA-12577). -Please use the remaining public constructor instead. -The deprecated config value default for the client config client.dns.lookup has been removed. In the unlikely -event that you set this config explicitly, we recommend leaving the config unset (use_all_dns_ips is used by default). +A number of deprecated classes and methods have been removed in the clients, core and tools modules: Review comment: nit: Should we say `classes, methods and tools` (or command) because of `kafka-preferred-replica-election`? ## File path: clients/src/main/java/org/apache/kafka/common/MessageFormatter.java ## @@ -34,33 +33,21 @@ */ public interface MessageFormatter extends Configurable, Closeable { -/** - * Initialises the MessageFormatter - * @param props Properties to configure the formatter - * @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface - */ -@Deprecated Review comment: If we remove this one, we could also remove the `kafka.common.MessageFormatter` trait in the core module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315284#comment-17315284 ] Ben Chen commented on KAFKA-12492: -- Do we need to run [local tests|https://github.com/apache/kafka/blob/trunk/README.md] and build trigger for this task as described here: [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes] ? > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
satishd commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r607562925 ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicate it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + * + * Segment in this state indicate it is successfully copied and it is available for reads. So, these segments + * will be accessible for reads. But this should be available for any cleanup activity like deleting segments by the + * caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}: + * Segment in this state indicate it is getting deleted. That means, it is not available for reads. But it should be + * available for any cleanup activity like deleting segments by the caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}: + * Segment in this state indicate it is already deleted. That means, it is not available for any activity including + * reads or cleanup activity. This cache will clear entries containing this state. + * + * + * + * + * + * + * + * + * COPY_SEGMENT_STARTED + * COPY_SEGMENT_FINISHED + * DELETE_SEGMENT_STARTED + * DELETE_SEGMENT_FINISHED + * + * + * + * + * remoteLogSegmentMetadata(int leaderEpoch, long offset) + * No + * Yes + * No + * No + * + * + * listRemoteLogSegments (int leaderEpoch) + * Yes + * Yes + * Yes + * No + * + * + * listAllRemoteLogSegments() + * Yes + * Yes + * Yes + * No + * + * + * + * + * + */ +public class RemoteLogMetadataCache { + +private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCache.class); + +// It contains all the segment-id to metadata mappings which did not reach the terminal state viz DELETE_SEGMENT_FINISHED. +private final ConcurrentMap idToSegmentMetadata += new ConcurrentHashMap<>(); + +// It contains leader epoch to the respective entry containing the state. +private final ConcurrentMap leaderEpochEntries = new ConcurrentHashMap<>(); + +/** + * Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns {@link Optional#empty()}. + * + * @param leaderEpoch leader epoch for the given offset + * @param offset offset + * @return the requested remote log segment metadata if it exists. + */ +public Optional remoteLogSegmentMetadata(int leaderEpoch, long offset) { +RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.get(leaderEpoch); + +i
[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
satishd commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r607561788 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageException.java ## @@ -28,9 +27,14 @@ public RemoteStorageException(final String message) { super(message); } -public RemoteStorageException(final String message, - final Throwable cause) { +public RemoteStorageException(final String message, final Throwable cause) { super(message, cause); } +public RemoteStorageException() { Review comment: Not really needed for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
satishd commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r607559905 ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicate it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + * + * Segment in this state indicate it is successfully copied and it is available for reads. So, these segments + * will be accessible for reads. But this should be available for any cleanup activity like deleting segments by the + * caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}: + * Segment in this state indicate it is getting deleted. That means, it is not available for reads. But it should be + * available for any cleanup activity like deleting segments by the caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}: + * Segment in this state indicate it is already deleted. That means, it is not available for any activity including + * reads or cleanup activity. This cache will clear entries containing this state. + * + * + * + * + * + * + * + * + * COPY_SEGMENT_STARTED + * COPY_SEGMENT_FINISHED + * DELETE_SEGMENT_STARTED + * DELETE_SEGMENT_FINISHED + * + * + * + * + * remoteLogSegmentMetadata(int leaderEpoch, long offset) + * No + * Yes + * No + * No + * + * + * listRemoteLogSegments (int leaderEpoch) + * Yes + * Yes + * Yes + * No + * + * + * listAllRemoteLogSegments() + * Yes + * Yes + * Yes + * No + * + * + * + * + * + */ +public class RemoteLogMetadataCache { + +private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCache.class); + +// It contains all the segment-id to metadata mappings which did not reach the terminal state viz DELETE_SEGMENT_FINISHED. +private final ConcurrentMap idToSegmentMetadata += new ConcurrentHashMap<>(); + +// It contains leader epoch to the respective entry containing the state. +private final ConcurrentMap leaderEpochEntries = new ConcurrentHashMap<>(); + +/** + * Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns {@link Optional#empty()}. + * + * @param leaderEpoch leader epoch for the given offset + * @param offset offset + * @return the requested remote log segment metadata if it exists. + */ +public Optional remoteLogSegmentMetadata(int leaderEpoch, long offset) { +RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.get(leaderEpoch); + +i
[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
satishd commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r607556778 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java ## @@ -21,14 +21,16 @@ import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; /** - * It indicates the state of the remote log segment. This will be based on the action executed on this + * This enum indicates the state of the remote log segment. This will be based on the action executed on this * segment by the remote log service implementation. * - * It goes through the below state transitions. + * It goes through the below state transitions. Self transition is treated as valid. This allows updating with the + * same state in case of retries and failover. Review comment: Looks like autoformatter changed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cc13ny opened a new pull request #10486: KAFKA-12492: Fix the formatting of example RocksDBConfigSetter
cc13ny opened a new pull request #10486: URL: https://github.com/apache/kafka/pull/10486 Fix the formatting of example RocksDBConfigSetter due to the un-arranged spaces within tag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10338: KAFKA-10251: wait for consumer rebalance completed before consuming records
showuon commented on pull request #10338: URL: https://github.com/apache/kafka/pull/10338#issuecomment-813859513 @apurvam @hachikuji , call for review. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
satishd commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r607551710 ## File path: remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class is an implementation of {@link RemoteLogMetadataManager} backed by in-memory store. + */ +public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManager { +private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class); + +private Map idToPartitionDeleteMetadata = +new ConcurrentHashMap<>(); + +private Map idToRemoteLogMetadataCache = new ConcurrentHashMap<>(); + +@Override +public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) +throws RemoteStorageException { +log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata); +Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null"); + +RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId(); + +idToRemoteLogMetadataCache +.computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> new RemoteLogMetadataCache()) +.addCopyInProgressSegment(remoteLogSegmentMetadata); +} + +@Override +public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate) +throws RemoteStorageException { +log.debug("Updating remote log segment: [{}]", metadataUpdate); +Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be null"); + + getRemoteLogMetadataCache(metadataUpdate.remoteLogSegmentId().topicIdPartition()) +.updateRemoteLogSegmentMetadata(metadataUpdate); +} + +private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition) +throws RemoteResourceNotFoundException { +RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); +if (remoteLogMetadataCache == null) { +throw new RemoteResourceNotFoundException("No existing metadata found for partition: " + topicIdPartition); +} + +return remoteLogMetadataCache; +} + +@Override +public Optional remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, + int epochForOffset, + long offset) +throws RemoteStorageException { +Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); + +return getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset, offset); +} + +@Override +public Optional highestOffsetForEpoch(TopicIdPartition topicIdPartition, +int leaderEpoch) throws RemoteStorageException { +Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); + +return getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch); +} + +@Override +public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) +throws RemoteStorageException { +log.debug("Adding delete state with: [{}]", remotePartitionDeleteMetadata); +Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); + +TopicIdPartition topicIdPartition = remotePartitionDeleteMetadata.topic
[jira] [Commented] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand
[ https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315268#comment-17315268 ] Luke Chen commented on KAFKA-12598: --- [~rajinisiva...@gmail.com] [~rsivaram] , could you help confirm that in ConfigCommand, we cannot remove the _deprecated_ "--zookeeper" option yet in V3.0 because we still need to dependent on "--zookeeper" option to allow dynamic broker configs to be configured before starting broker? (i.e. KAFKA-6805) Thank you. > Remove deprecated --zookeeper in ConfigCommand > -- > > Key: KAFKA-12598 > URL: https://issues.apache.org/jira/browse/KAFKA-12598 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
satishd commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r607546803 ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicate it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + * + * Segment in this state indicate it is successfully copied and it is available for reads. So, these segments + * will be accessible for reads. But this should be available for any cleanup activity like deleting segments by the + * caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}: + * Segment in this state indicate it is getting deleted. That means, it is not available for reads. But it should be + * available for any cleanup activity like deleting segments by the caller of this class. + * + * + * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}: + * Segment in this state indicate it is already deleted. That means, it is not available for any activity including + * reads or cleanup activity. This cache will clear entries containing this state. + * + * + * + * + * + * + * + * + * COPY_SEGMENT_STARTED + * COPY_SEGMENT_FINISHED + * DELETE_SEGMENT_STARTED + * DELETE_SEGMENT_FINISHED + * + * + * + * + * remoteLogSegmentMetadata(int leaderEpoch, long offset) + * No + * Yes + * No + * No + * + * + * listRemoteLogSegments (int leaderEpoch) + * Yes + * Yes + * Yes + * No + * + * + * listAllRemoteLogSegments() + * Yes + * Yes + * Yes + * No + * + * + * + * + * + */ +public class RemoteLogMetadataCache { Review comment: No. javadoc is generated for clients module with the package `/org/apache/kafka/server/log/remote/storage/ `. But this class is in `remote-storage` module. ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissi
[GitHub] [kafka] ijuma merged pull request #10479: MINOR: Jenkinsfile's `post` needs `agent` to be set
ijuma merged pull request #10479: URL: https://github.com/apache/kafka/pull/10479 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] highluck commented on pull request #10302: KAFKA-7785: move internal DefaultPartitionGrouper
highluck commented on pull request #10302: URL: https://github.com/apache/kafka/pull/10302#issuecomment-813824311 @guozhangwang May I ask for a merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] highluck commented on pull request #9861: MINOR: Modify unnecessary access specifiers
highluck commented on pull request #9861: URL: https://github.com/apache/kafka/pull/9861#issuecomment-813823981 @chia7712 May I ask for a review of this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] highluck commented on a change in pull request #9851: KAFKA-10769 Remove JoinGroupRequest#containsValidPattern as it is dup…
highluck commented on a change in pull request #9851: URL: https://github.com/apache/kafka/pull/9851#discussion_r607505049 ## File path: clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java ## @@ -58,17 +57,7 @@ public void shouldThrowOnInvalidGroupInstanceIds() { } } } - -@Test -public void shouldRecognizeInvalidCharactersInGroupInstanceIds() { -char[] invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='}; - -for (char c : invalidChars) { -String instanceId = "Is " + c + "illegal"; -assertFalse(JoinGroupRequest.containsValidPattern(instanceId)); -} -} - + Review comment: @chia7712 thanks i updated code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #10485: MINOR: Enable scala/java joint compilation consistently for `core` module
ijuma opened a new pull request #10485: URL: https://github.com/apache/kafka/pull/10485 We were doing it only for test files previously. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9851: KAFKA-10769 Remove JoinGroupRequest#containsValidPattern as it is dup…
chia7712 commented on a change in pull request #9851: URL: https://github.com/apache/kafka/pull/9851#discussion_r607490193 ## File path: clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java ## @@ -58,17 +57,7 @@ public void shouldThrowOnInvalidGroupInstanceIds() { } } } - -@Test -public void shouldRecognizeInvalidCharactersInGroupInstanceIds() { -char[] invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='}; - -for (char c : invalidChars) { -String instanceId = "Is " + c + "illegal"; -assertFalse(JoinGroupRequest.containsValidPattern(instanceId)); -} -} - + Review comment: please remove those indents. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10446: MINOR: [ConfigEntry.class] add 'type' to 'toString' and 'hashCode'
chia7712 commented on a change in pull request #10446: URL: https://github.com/apache/kafka/pull/10446#discussion_r607486103 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java ## @@ -167,6 +169,8 @@ public int hashCode() { result = prime * result + (isReadOnly ? 1 : 0); result = prime * result + source.hashCode(); result = prime * result + synonyms.hashCode(); +result = prime * result + type.hashCode(); +result = prime * result + documentation.hashCode(); Review comment: nice point! will copy that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h
[ https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-8924. --- Resolution: Duplicate > Default grace period (-1) of TimeWindows causes suppress to emit events after > 24h > - > > Key: KAFKA-8924 > URL: https://issues.apache.org/jira/browse/KAFKA-8924 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > Labels: needs-kip > > h2. Problem > The default creation of TimeWindows, like > {code} > TimeWindows.of(ofMillis(xxx)) > {code} > calls an internal constructor > {code} > return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS); > {code} > And the *-1* parameter is the default grace period which I think is here for > backward compatibility > {code} > @SuppressWarnings("deprecation") // continuing to support > Windows#maintainMs/segmentInterval in fallback mode > @Override > public long gracePeriodMs() { > // NOTE: in the future, when we remove maintainMs, > // we should default the grace period to 24h to maintain the default > behavior, > // or we can default to (24h - size) if you want to be super accurate. > return graceMs != -1 ? graceMs : maintainMs() - size(); > } > {code} > The problem is that if you use a TimeWindows with gracePeriod of *-1* > together with suppress *untilWindowCloses*, it never emits an event. > You can check the Suppress tests > (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where > [~vvcephei] was (maybe) aware of that and all the scenarios specify the > gracePeriod. > I will add a test without it on my branch and it will fail. > The test: > https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db > > h2. Now what can be done > One easy fix would be to change the default value to 0, which works fine for > me in my project, however, I am not aware of the impact it would have done > due to the changes in the *gracePeriodMs* method mentioned before. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h
[ https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315217#comment-17315217 ] A. Sophie Blee-Goldman commented on KAFKA-8924: --- Can we close this as a duplicate? We have KIP-633 currently under voting which should improve things, we can track the progress from here with https://issues.apache.org/jira/browse/KAFKA-8613 > Default grace period (-1) of TimeWindows causes suppress to emit events after > 24h > - > > Key: KAFKA-8924 > URL: https://issues.apache.org/jira/browse/KAFKA-8924 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > Labels: needs-kip > > h2. Problem > The default creation of TimeWindows, like > {code} > TimeWindows.of(ofMillis(xxx)) > {code} > calls an internal constructor > {code} > return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS); > {code} > And the *-1* parameter is the default grace period which I think is here for > backward compatibility > {code} > @SuppressWarnings("deprecation") // continuing to support > Windows#maintainMs/segmentInterval in fallback mode > @Override > public long gracePeriodMs() { > // NOTE: in the future, when we remove maintainMs, > // we should default the grace period to 24h to maintain the default > behavior, > // or we can default to (24h - size) if you want to be super accurate. > return graceMs != -1 ? graceMs : maintainMs() - size(); > } > {code} > The problem is that if you use a TimeWindows with gracePeriod of *-1* > together with suppress *untilWindowCloses*, it never emits an event. > You can check the Suppress tests > (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where > [~vvcephei] was (maybe) aware of that and all the scenarios specify the > gracePeriod. > I will add a test without it on my branch and it will fail. > The test: > https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db > > h2. Now what can be done > One easy fix would be to change the default value to 0, which works fine for > me in my project, however, I am not aware of the impact it would have done > due to the changes in the *gracePeriodMs* method mentioned before. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8613) Set default grace period to 0
[ https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-8613: - Assignee: A. Sophie Blee-Goldman > Set default grace period to 0 > - > > Key: KAFKA-8613 > URL: https://issues.apache.org/jira/browse/KAFKA-8613 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > Currently, the grace period is set to retention time if the grace period is > not specified explicitly. The reason for setting the default grace period to > retention time was backward compatibility. Topologies that were implemented > before the introduction of the grace period, added late arriving records to a > window as long as the window existed, i.e., as long as its retention time was > not elapsed. > This unintuitive default grace period has already caused confusion among > users. > For the next major release, we should set the default grace period to > {{Duration.ZERO}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration
[ https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-6603. --- Resolution: Fixed > Kafka streams off heap memory usage does not match expected values from > configuration > - > > Key: KAFKA-6603 > URL: https://issues.apache.org/jira/browse/KAFKA-6603 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Igor Calabria >Priority: Minor > > Hi, I have a simple aggregation pipeline that's backed by the default state > store(rocksdb). The pipeline works fine except that off heap the memory usage > is way higher than expected. Following the > [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > has some effect(memory usage is reduced) but the values don't match at all. > The java process is set to run with just `-Xmx300m -Xms300m` and rocksdb > config looks like this > {code:java} > tableConfig.setCacheIndexAndFilterBlocks(true); > tableConfig.setBlockCacheSize(1048576); //1MB > tableConfig.setBlockSize(16 * 1024); // 16KB > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2); > options.setWriteBufferSize(8 * 1024); // 8KB{code} > To estimate memory usage, I'm using this formula > {noformat} > (block_cache_size + write_buffer_size * write_buffer_number) * segments * > partitions{noformat} > Since my topic has 25 partitions with 3 segments each(it's a windowed store), > off heap memory usage should be about 76MB. What I'm seeing in production is > upwards of 300MB, even taking in consideration extra overhead from rocksdb > compaction threads, this seems a bit high (especially when the disk usage for > all files is just 1GB) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration
[ https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-8165. --- Resolution: Fixed > Streams task causes Out Of Memory after connection issues and store > restoration > --- > > Key: KAFKA-8165 > URL: https://issues.apache.org/jira/browse/KAFKA-8165 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 > Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window > store, 4 KV stores. > Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 > application instance, 2 threads per instance. > Kafka 2.1, Kafka Streams 2.1 > Amazon Linux. > Scala application, on Docker based on openJdk9. >Reporter: Di Campo >Priority: Major > > Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the > (largely stateful) application has been consuming ~160 messages per second at > a sustained rate for several hours. > However it started having connection issues to the brokers. > {code:java} > Connection to node 3 (/172.31.36.118:9092) could not be established. Broker > may not be available. (org.apache.kafka.clients.NetworkClient){code} > Also it began showing a lot of these errors: > {code:java} > WARN [Consumer > clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer, > groupId=stream-processor] 1 partitions have leader brokers without a > matching listener, including [broker-2-health-check-0] > (org.apache.kafka.clients.NetworkClient){code} > In fact, the _health-check_ topic is in the broker but not consumed by this > topology or used in any way by the Streams application (it is just broker > healthcheck). It does not complain about topics that are actually consumed by > the topology. > Some time after these errors (that appear at a rate of 24 appearances per > second during ~5 minutes), then the following logs appear: > {code:java} > [2019-03-27 15:14:47,709] WARN [Consumer > clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer, > groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker > may not be available. (org.apache.kafka.clients.NetworkClient){code} > In between 6 and then 3 lines of "Connection could not be established" error > messages, 3 of these ones slipped in: > {code:java} > [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore > partition 15 total records to be restored 17 > (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener){code} > > ... one for each different KV store I have (I still have another KV that > does not appear, and a WindowedStore store that also does not appear). > Then I finally see "Restoration Complete" (using a logging > ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it > seems it may be fine now to restart the processing. > Three minutes later, some events get processed, and I see an OOM error: > {code:java} > java.lang.OutOfMemoryError: GC overhead limit exceeded{code} > > ... so given that it usually allows to process during hours under same > circumstances, I'm wondering whether there is some memory leak in the > connection resources or somewhere in the handling of this scenario. > Kafka and KafkaStreams 2.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315209#comment-17315209 ] A. Sophie Blee-Goldman commented on KAFKA-12419: Thanks -- wanted to make sure I understood why exactly the StreamsConfig was helpful to dependency injection. The KIP makes sense to me. Opened https://github.com/apache/kafka/pull/10484 > Remove Deprecated APIs of Kafka Streams in 3.0 > -- > > Key: KAFKA-12419 > URL: https://issues.apache.org/jira/browse/KAFKA-12419 > Project: Kafka > Issue Type: Improvement > Components: streams, streams-test-utils >Reporter: Guozhang Wang >Assignee: Tomasz Nguyen >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > Here's a list of deprecated APIs that we have accumulated in the past, we can > consider removing them in 3.0: > * KIP-198: "--zookeeper" flag from StreamsResetter (1.0) > * KIP-171: "–execute" flag from StreamsResetter (1.1) > * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1) > * KIP-251: overloaded "ProcessorContext#forward" (2.0) > * KIP-276: "StreamsConfig#getConsumerConfig" (2.0) > * KIP-319: "WindowBytesStoreSupplier#segments" (2.1) > * KIP-321: "TopologyDescription.Source#topics" (2.1) > * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1) > * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1) > * KIP-365/366: Implicit Scala Apis (2.1) > * KIP-372: overloaded "KStream#groupBy" (2.1) > * KIP-307: "Joined#named" (2.3) > * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3) > * KIP-429: "PartitionAssignor" interface (2.4) > * KIP-470: "TopologyTestDriver#pipeInput" (2.4) > * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4) > * KIP-479: overloaded "KStream#join" (2.4) > * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5) > * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and > "KafkaStreams#store" (2.5) > And here's a list of already filed JIRAs for removing deprecated APIs > * KAFKA-10434 > * KAFKA-7785 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #10484: MINOR: un-deprecate StreamsConfig overloads to support dependency injection
ableegoldman opened a new pull request #10484: URL: https://github.com/apache/kafka/pull/10484 In [#5344](https://github.com/apache/kafka/pull/5344#issuecomment-413350338) it came to our attention that the StreamsConfig overloads of the KafkaStreams constructors are actually quite useful for dependency injection, providing a cleaner way to configure dependencies and better type safety. We considered removing these deprecated overloads in the upcoming 3.0 release, but decided against it for the above reasons. Since we no longer intend to remove these APIs it makes sense to drop the Deprecation entirely, so users can start or continue to use them without worry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API
hachikuji commented on pull request #10483: URL: https://github.com/apache/kafka/pull/10483#issuecomment-813766742 @chia7712 @dajac No rush, but when you have time, this is a continuation of the previous work which added `AdminApiDriver`. This patch contains `CoordinatorStrategy`, which is needed to lookup transaction coordinators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API
hachikuji opened a new pull request #10483: URL: https://github.com/apache/kafka/pull/10483 This patch contains the `Admin` implementation of the `DescribeTransactions` APIs described in KIP-664: KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity
dengziming commented on a change in pull request #10393: URL: https://github.com/apache/kafka/pull/10393#discussion_r607430517 ## File path: raft/src/main/java/org/apache/kafka/raft/CandidateState.java ## @@ -235,6 +240,15 @@ public int epoch() { return highWatermark; } +@Override +public boolean canGrantVote(int candidateId, boolean isLogUpToDate) { +// Still reject vote request even candidateId = localId, Although the candidate votes for +// itself, this vote is implicit and not "granted". +log.debug("Rejecting vote request from candidate {} since we are already candidate in epoch {}", +candidateId, epoch); Review comment: @ijuma , we logged both vote result and the reject reason in `KafkaRaftClient` before this PR, but different `EpochState` have different reject reason, to reduce the cyclomatic complexity we moved the if-else out of `KafkaRaftClient`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10482: KAFKA-12499: add transaction timeout verification
ableegoldman commented on a change in pull request #10482: URL: https://github.com/apache/kafka/pull/10482#discussion_r607423468 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -106,7 +107,7 @@ public StreamsProducer(final StreamsConfig config, producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); eosBetaProducerConfigs = null; - +verifyTransactionTimeoutCompatibility(producerConfigs, config); Review comment: +1, we should fail-fast at the level of the KafkaStreams. Otherwise in eos-alpha we actually have to wait for all the individual StreamThreads to be created, started, complete a rebalance to get a task assignment, and _then_ start throwing IllegalArgumentException on each thread when it goes to create a task producer. (Can we also log an error in addition to the IllegalArgumentException?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lamberken commented on pull request #10469: KAFKA-12611: Fix using random payload in ProducerPerformance incorrectly
lamberken commented on pull request #10469: URL: https://github.com/apache/kafka/pull/10469#issuecomment-813750487 hi @mageshn @C0urante @gharris1727 , please review when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315186#comment-17315186 ] A. Sophie Blee-Goldman commented on KAFKA-12492: Yeah, you need to check out the "*Kafka Code Repository*" and not just the "*Kafka Website Repository*". The whole setup and process with the docs is kind of confusing, I'll see if I can improve the docs/wiki to clear things up a bit. Thanks :) > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315185#comment-17315185 ] Ben Chen commented on KAFKA-12492: -- Got it. Thanks. I guess I missed something mentioned here: https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10482: KAFKA-12499: add transaction timeout verification
mjsax commented on a change in pull request #10482: URL: https://github.com/apache/kafka/pull/10482#discussion_r607414956 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -106,7 +107,7 @@ public StreamsProducer(final StreamsConfig config, producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); eosBetaProducerConfigs = null; - +verifyTransactionTimeoutCompatibility(producerConfigs, config); Review comment: It might be better to do this check in `StreamsConfig` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315183#comment-17315183 ] A. Sophie Blee-Goldman commented on KAFKA-12492: Thanks, I'll take a look. But note that you always need to submit a docs PR against the kafka-repo, while the kafka-site repo is optional. The reason being what I said above, ie during a release the docs from kafka get copied over to kafka-site. So if you only fix them in kafka-site but not in kafka, the fix will just get wiped out when the next release comes up. In general we usually do the kafka PR before the kafka-site PR, for that reason among others. But that's not a hard rule or anything -- my point is just that you need a PR for the kafka repo as well. > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda closed pull request #9560: KAFKA-10345: Add ZK-notification based update for trust/key store paths
abbccdda closed pull request #9560: URL: https://github.com/apache/kafka/pull/9560 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #10482: KAFKA-12499: add transaction timeout verification
abbccdda opened a new pull request #10482: URL: https://github.com/apache/kafka/pull/10482 This PR tries to add the check for transaction timeout for a comparison against commit interval of streams. If transaction timeout is smaller than commit interval, stream should crash and inform user to update their commit interval to match the given transaction timeout, or vise versa. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Issue Comment Deleted] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Chen updated KAFKA-12492: - Comment: was deleted (was: https://github.com/apache/kafka-site/pull/345/) > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315173#comment-17315173 ] Ben Chen commented on KAFKA-12492: -- https://github.com/apache/kafka-site/pull/345/ > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315172#comment-17315172 ] Ben Chen commented on KAFKA-12492: -- [~ableegoldman] appreciate for the detailed explanation! Here is the PR: https://github.com/apache/kafka-site/pull/345/ > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315156#comment-17315156 ] Ismael Juma commented on KAFKA-12574: - For 4.0, we can make eos-v2 the default and make it a non-issue for most people. Then this config will only be there for people who want the weaker semantics. :) > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315153#comment-17315153 ] Guozhang Wang commented on KAFKA-12574: --- I'm fine with option 2) as well, to stay with `eos-v2` eventually. Honestly I think we should try to avoid such upgrade path ever in the future -- i.e. a rolling bounce with config change -- but I know that one should never say never :) So although we may not ever have a `eos-v3`, this is not a bad state to end with `eos-v2`. > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607386832 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -319,6 +328,10 @@ public String name() { } @Override -public void close() {} +public void close() { +if (accumulator != null) { Review comment: QuorumState tests where I was passing in null for the accumulator to get the code to compile. I'll see about using a mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607383662 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List records) { return append(epoch, records, true); } +@SuppressWarnings("unchecked") private Long append(int epoch, List records, boolean isAtomic) { -BatchAccumulator accumulator = this.accumulator; -if (accumulator == null) { +BatchAccumulator accumulator; +try { +accumulator = (BatchAccumulator) quorum.leaderStateOrThrow().accumulator(); Review comment: Removing the cast causes this... `Type Mismatch: Cannot convert from BatchAccumulator to BatchAccumulator ` even though this is the signature of accumulator() `public BatchAccumulator accumulator()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607381875 ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest { Review comment: I confess it's a little annoying to see the generic type leak down to here, but I guess that's the price we have to pay. There's a big part of me that wants to remove the generic and let `ApiMessageAndVersion` be the only supported type. Anyway, that is fuel for a separate issue/discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607380961 ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest { private final int localId = 0; private final int epoch = 5; private final LogContext logContext = new LogContext(); -private LeaderState newLeaderState( +private LeaderState newLeaderState( Set voters, long epochStartOffset ) { -return new LeaderState( +return new LeaderState<>( localId, epoch, epochStartOffset, voters, voters, -logContext +logContext, +null Review comment: Yeah, let's add an explicit `requireNonNull` in the constructor. Here we could potentially use a mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12294. - Fix Version/s: 3.0.0 Resolution: Fixed > Consider using the forwarding mechanism for metadata auto topic creation > > > Key: KAFKA-12294 > URL: https://issues.apache.org/jira/browse/KAFKA-12294 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Boyang Chen >Priority: Major > Fix For: 3.0.0 > > > Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to > improve the topic creation auditing by forwarding the CreateTopicsRequest > inside Envelope for the given client. Details in > [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients
hachikuji merged pull request #10142: URL: https://github.com/apache/kafka/pull/10142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315144#comment-17315144 ] Ismael Juma commented on KAFKA-12574: - {quote}Regarding the proposal, I thin users would find it very odd if we moved on to eos-v2 and then suddenly deprecated it and went back to just "eos" – makes it seem like there was a problem with eos-v2. I would be fine with just staying on eos-v2 though. For one thing it leaves the door open to further developments in eos that need to be gated by a config, eg eos-v3, if we ever have need for that again. {quote} +1 > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315142#comment-17315142 ] A. Sophie Blee-Goldman commented on KAFKA-12574: > This is well documented in > https://kafka.apache.org/27/documentation/streams/upgrade-guide Ah, you're right. I must have been on an older version of the docs -- I find Google often takes me to 2.4 docs for some reason. We need better SEO :/ (I filed a ticket for this already a while back) Also not sure how I forgot about EosBetaUpgradeIntegrationTest after spending so much time trying to help fix it -- sorry for doubting our test coverage and docs. Regarding the proposal, I thin users would find it very odd if we moved on to eos-v2 and then suddenly deprecated it and went back to just "eos" -- makes it seem like there was a problem with eos-v2. I would be fine with just staying on eos-v2 though. For one thing it leaves the door open to further developments in eos that need to be gated by a config, eg eos-v3, if we ever have need for that again. > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
hachikuji commented on a change in pull request #9441: URL: https://github.com/apache/kafka/pull/9441#discussion_r607374397 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int, * * @param offsetTopicPartitionId The partition we are now leading */ - def onElection(offsetTopicPartitionId: Int): Unit = { -info(s"Elected as the group coordinator for partition $offsetTopicPartitionId") -groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) + def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { +val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId) +if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) { + info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch") + groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) + epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch) +} else { + warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " + +s"in epoch $coordinatorEpoch since current epoch is $currentEpoch") +} } /** * Unload cached state for the given partition and stop handling requests for groups which map to it. * * @param offsetTopicPartitionId The partition we are no longer leading */ - def onResignation(offsetTopicPartitionId: Int): Unit = { -info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId") -groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) + def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = { +val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId) +if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) { + info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch") + groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) + epochForPartitionId.remove(offsetTopicPartitionId) Review comment: Hmm.. Why remove the epoch after resignation? It seems like it would be useful to keep tracking it. Maybe it's useful to distinguish the case where the replica is to be deleted? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -87,6 +87,8 @@ class GroupCoordinator(val brokerId: Int, private val isActive = new AtomicBoolean(false) + val epochForPartitionId = mutable.Map[Int, Int]() Review comment: Does this need to be a concurrent collection? It does not look like we can count on a lock protecting `onElection` and `onResignation`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12607) Allow votes to be granted in resigned state
[ https://issues.apache.org/jira/browse/KAFKA-12607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12607: -- Labels: newbie++ (was: ) > Allow votes to be granted in resigned state > --- > > Key: KAFKA-12607 > URL: https://issues.apache.org/jira/browse/KAFKA-12607 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > Labels: newbie++ > > When the leader is shutting down, it transitions to a resigned state. > Currently all votes are rejected in this state, but we should allow the > resigned leader to help a candidate get elected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315140#comment-17315140 ] A. Sophie Blee-Goldman commented on KAFKA-12492: The kafka-site repo is what the actual, live docs are built from. That's why there are separate folders like 27, 26, etc -- these correspond to the docs for versions 2.7 and 2.6, and so on. You only need to submit a PR to the kafka-site repo if you want your change to show up immediately -- if you don't mind waiting for the next release, you can just open a PR to fix the docs in the kafka repo directly. Then, these will be copied over to the kafka-site repo and made live when the next version is released. Obviously it would be ideal if we could fix this in all versions, but it's probably sufficient to just fix it going forward. The 2.8 release is actually going on at the moment, so I would recommend submitting a PR to the kafka repo for now. If we can get it merged before 2.8 is released, then we're good -- otherwise you can open a followup PR with the same fix in just the 28 subdirectory of the kafka-site repo. I'm not sure why you're getting a 403, I was able to setup a local apache server to test some docs but that was a while ago. Since it's just a fix of an existing formatting error, I wouldn't worry about testing it too much. As long as you can figure out why the formatting was messed up to begin with, and feel reasonably confident in your fix, then that's good enough. Remember, once the fix is in kafka-site it'll be live so you can just see what it looks like then. If something is still off, you can always submit a followup PR to fix it right away in kafka-site > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange
hachikuji commented on pull request #10481: URL: https://github.com/apache/kafka/pull/10481#issuecomment-813686724 @guozhangwang Thanks for the quick comment. I did consider that. I can't say I had a particularly strong reason to reject it, but ultimately I convinced myself that modifying the existing check was good enough and probably simpler. The only invariant that we need to protect is that all records appended in the leader's epoch actually carry the right epoch tag. The refactor proposed here may even give us a stronger way to enforce the invariant: https://github.com/apache/kafka/pull/10480/files. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ewencp commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file
ewencp commented on a change in pull request #10474: URL: https://github.com/apache/kafka/pull/10474#discussion_r607362884 ## File path: LICENSE-binary ## @@ -0,0 +1,602 @@ + + Apache License + Version 2.0, January 2004 +http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution
[GitHub] [kafka] guozhangwang commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange
guozhangwang commented on pull request #10481: URL: https://github.com/apache/kafka/pull/10481#issuecomment-813683249 Nice catch! Regarding the fix, WDYT to just remember the `LeaderChange` record's offset and compare against it instead of the epoch start offset? I'm thinking if in the future there are any scenarios that can fill in some more records between these two, we are still immune to any future bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange
hachikuji opened a new pull request #10481: URL: https://github.com/apache/kafka/pull/10481 KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that a newly elected leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the `LeaderChange` record which is written to the log as soon as the leader gets elected. Although we had this check implemented, it was off by one. We only ensured that replication reached the epoch start offset, which does not reflect the appended `LeaderChange` record. This patch fixes the check and clarifies the point of the check. The rest of the patch is just fixing up test cases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607353807 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -48,13 +49,16 @@ private final Set grantingVoters = new HashSet<>(); private final Logger log; +private final BatchAccumulator accumulator; + protected LeaderState( int localId, int epoch, long epochStartOffset, Set voters, Set grantingVoters, -LogContext logContext +LogContext logContext, +BatchAccumulator accumulator Review comment: I would keep `LogContext` as the last argument. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List records) { return append(epoch, records, true); } +@SuppressWarnings("unchecked") private Long append(int epoch, List records, boolean isAtomic) { -BatchAccumulator accumulator = this.accumulator; -if (accumulator == null) { +BatchAccumulator accumulator; +try { +accumulator = (BatchAccumulator) quorum.leaderStateOrThrow().accumulator(); Review comment: I think you should be able to remove this cast. ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest { private final int localId = 0; private final int epoch = 5; private final LogContext logContext = new LogContext(); -private LeaderState newLeaderState( +private LeaderState newLeaderState( Set voters, long epochStartOffset ) { -return new LeaderState( +return new LeaderState<>( localId, epoch, epochStartOffset, voters, voters, -logContext +logContext, +null Review comment: I would not pass a `null` and add a test checking that this field is returned correctly. ## File path: raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java ## @@ -269,7 +269,7 @@ public void testCandidateToLeader() throws IOException { assertTrue(state.isCandidate()); assertEquals(1, state.epoch()); -state.transitionToLeader(0L); +state.transitionToLeader(0L, null); Review comment: Again, the code in `KafkaRaftClient` assumes that this field cannot be `null`. ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -319,6 +328,10 @@ public String name() { } @Override -public void close() {} +public void close() { +if (accumulator != null) { Review comment: When would accumulator be null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file
ableegoldman commented on a change in pull request #10474: URL: https://github.com/apache/kafka/pull/10474#discussion_r607355497 ## File path: licenses/DWTFYWTPL ## @@ -0,0 +1,14 @@ +DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Review comment: 😂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rsomu commented on pull request #4040: KAFKA-6324: Change LogSegment.delete to deleteIfExists and harden log recovery
rsomu commented on pull request #4040: URL: https://github.com/apache/kafka/pull/4040#issuecomment-813671433 Is this fix supposed to avoid the[ NFS silly renames](https://sbg.technology/2018/07/10/kafka-nfs/) issue? I am currently testing Kafka 2.7 on NFS filesystem and encountered the same error message when deleting a topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr opened a new pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr opened a new pull request #10480: URL: https://github.com/apache/kafka/pull/10480 The KafkaRaftClient has a field for the BatchAccumulator that is only used and set when it is the leader. In other cases, leader specific information was stored in LeaderState. In a recent change EpochState, which LeaderState implements, was changed to be a Closable. QuorumState makes sure to always close the previous state before transitioning to the next state. This redesign was used to move the BatchAccumulator to the LeaderState and simplify some of the handling in KafkaRaftClient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-6985) Error connection between cluster node
[ https://issues.apache.org/jira/browse/KAFKA-6985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6985: - Component/s: (was: KafkaConnect) core > Error connection between cluster node > - > > Key: KAFKA-6985 > URL: https://issues.apache.org/jira/browse/KAFKA-6985 > Project: Kafka > Issue Type: Bug > Components: core > Environment: Centos-7 >Reporter: Ranjeet Ranjan >Priority: Major > > Hi Have setup multi-node Kafka cluster but getting an error while connecting > one node to another although there is an issue with firewall or port. I am > able to telnet > WARN [ReplicaFetcherThread-0-1], Error in fetch > Kafka.server.ReplicaFetcherThread$FetchRequest@8395951 > (Kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to Kafka-1:9092 (id: 1 rack: null) failed > > {code:java} > > at > kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) > at > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) > at > kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) > at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) > at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > {code} > Here you go server.properties > Node:1 > > {code:java} > # Server Basics # > # The id of the broker. This must be set to a unique integer for each broker. > broker.id=1 > # Switch to enable topic deletion or not, default value is false > delete.topic.enable=true > # Socket Server Settings > # > listeners=PLAINTEXT://kafka-1:9092 > advertised.listeners=PLAINTEXT://kafka-1:9092 > #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > # The number of threads handling network requests > num.network.threads=3 > # The number of threads doing disk I/O > num.io.threads=8 > # The send buffer (SO_SNDBUF) used by the socket server > socket.send.buffer.bytes=102400 > # The receive buffer (SO_RCVBUF) used by the socket server > socket.receive.buffer.bytes=102400 > # The maximum size of a request that the socket server will accept > (protection against OOM) > socket.request.max.bytes=104857600 > # Log Basics # > # A comma seperated list of directories under which to store log files > log.dirs=/var/log/kafka > # The default number of log partitions per topic. More partitions allow > greater > # parallelism for consumption, but this will also result in more files across > # the brokers. > num.partitions=1 > # The number of threads per data directory to be used for log recovery at > startup and flushing at shutdown. > # This value is recommended to be increased for installations with data dirs > located in RAID array. > num.recovery.threads.per.data.dir=1 > # Log Retention Policy > # > # The minimum age of a log file to be eligible for deletion due to age > log.retention.hours=48 > # A size-based retention policy for logs. Segments are pruned from the log as > long as the remaining > # segments don't drop below log.retention.bytes. Functions independently of > log.retention.hours. > log.retention.bytes=1073741824 > # The maximum size of a log segment file. When this size is reached a new log > segment will be created. > log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30 > # Zookeeper # > # root directory for all kafka znodes. > zookeeper.connect=10.130.82.28:2181 > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=6000 > {code} > > > Node-2 > {code:java} > # Server Basics # > # The id of the broker. This must be set to a unique integer for each broker. > broker.id=2 > # Switch to enable topic deletion or not, default value is false > delete.topic.enable=true > # Socket Server Settings > # > listeners=PLAINTEXT://kafka-2:9092 > advertised.listeners=PLAINTEXT://kafka-2:9092 > #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > # Th
[jira] [Resolved] (KAFKA-8551) Comments for connectors() in Herder interface
[ https://issues.apache.org/jira/browse/KAFKA-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8551. -- Resolution: Won't Fix Marking as won't fix, since the details are insufficient to try to address. > Comments for connectors() in Herder interface > -- > > Key: KAFKA-8551 > URL: https://issues.apache.org/jira/browse/KAFKA-8551 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 >Reporter: Luying Liu >Priority: Major > > There are mistakes in the comments for connectors() in Herder interface. The > mistakes are in the file > [kafka|https://github.com/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime]/[src|https://github.com/apache/kafka/tree/trunk/connect/runtime/src]/[main|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main]/[java|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java]/[org|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org]/[apache|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache]/[kafka|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime]/*Herder.java.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo
[ https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8664. -- Resolution: Won't Fix The reported problem is for a connector implementation that is not owned by the Apache Kafka project. Please report the issue with the provider of the connector. > non-JSON format messages when streaming data from Kafka to Mongo > > > Key: KAFKA-8664 > URL: https://issues.apache.org/jira/browse/KAFKA-8664 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Vu Le >Priority: Major > Attachments: MongoSinkConnector.properties, > log_error_when_stream_data_not_a_json_format.txt > > > Hi team, > I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB > Kafka Connector > ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]) > However, if I send a non-JSON format message the Connector died. Please see > the log file for details. > My config file: > {code:java} > name=mongo-sink > topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector > tasks.max=1 > key.ignore=true > # Specific global MongoDB Sink Connector configuration > connection.uri=mongodb://localhost:27017 > database=test_kafka > collection=transaction > max.num.retries=3 > retries.defer.timeout=5000 > type.name=kafka-connect > key.converter=org.apache.kafka.connect.json.JsonConverter > key.converter.schemas.enable=false > value.converter=org.apache.kafka.connect.json.JsonConverter > value.converter.schemas.enable=false > {code} > I have 2 separated questions: > # how to ignore the message which is non-json format? > # how to defined a default-key for this kind of message (for example: abc -> > \{ "non-json": "abc" } ) > Thanks -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8867) Kafka Connect JDBC fails to create PostgreSQL table with default boolean value in schema
[ https://issues.apache.org/jira/browse/KAFKA-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8867. -- Resolution: Won't Fix The reported problem is for the Confluent JDBC source/sink connector, and should be reported via that connector's GitHub repository issues. > Kafka Connect JDBC fails to create PostgreSQL table with default boolean > value in schema > > > Key: KAFKA-8867 > URL: https://issues.apache.org/jira/browse/KAFKA-8867 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Tudor >Priority: Major > > The `CREATE TABLE ..` statement generated for JDBC sink connectors when > configured with `auto.create: true` generates field declarations that do not > conform to allowed PostgreSQL syntax when considering fields of type boolean > with default values. > Example record value Avro schema: > {code:java} > { > "namespace": "com.test.avro.schema.v1", > "type": "record", > "name": "SomeEvent", > "fields": [ > { > "name": "boolean_field", > "type": "boolean", > "default": false > } > ] > } > {code} > The connector task fails with: > {code:java} > ERROR WorkerSinkTask{id=test-events-sink-0} RetriableException from SinkTask: > (org.apache.kafka.connect.runtime.WorkerSinkTask:551) > org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: > org.postgresql.util.PSQLException: ERROR: column "boolean_field" is of type > boolean but default expression is of type integer > Hint: You will need to rewrite or cast the expression. > at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > > The generated SQL statement is: > {code:java} > CREATE TABLE "test_data" ("boolean_field" BOOLEAN DEFAULT 0){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #10479: MINOR: Jenkinsfile's `post` needs `agent` to be set
ijuma opened a new pull request #10479: URL: https://github.com/apache/kafka/pull/10479 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8961) Unable to create secure JDBC connection through Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-8961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8961. -- Resolution: Won't Fix This is not a problem of the Connect framework, and is instead an issue with the connector implementation – or more likely the _installation_ of the connector in the user's environment. > Unable to create secure JDBC connection through Kafka Connect > - > > Key: KAFKA-8961 > URL: https://issues.apache.org/jira/browse/KAFKA-8961 > Project: Kafka > Issue Type: Bug > Components: build, clients, KafkaConnect, network >Affects Versions: 2.2.1 >Reporter: Monika Bainsala >Priority: Major > > As per below article for enabling JDBC secure connection, we can use updated > URL parameter while calling the create connector REST API. > Exampl: > jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=YES)(FAILOVER=YES)(ADDRESS=(PROTOCOL=tcp)(HOST=X)(PORT=1520)))(CONNECT_DATA=(SERVICE_NAME=XXAP)));EncryptionLevel=requested;EncryptionTypes=RC4_256;DataIntegrityLevel=requested;DataIntegrityTypes=MD5" > > But this approach is not working currently, kindly help in resolving this > issue. > > Reference : > [https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9017) We see timeout in kafka in production cluster
[ https://issues.apache.org/jira/browse/KAFKA-9017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9017: - Component/s: (was: KafkaConnect) core > We see timeout in kafka in production cluster > - > > Key: KAFKA-9017 > URL: https://issues.apache.org/jira/browse/KAFKA-9017 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 > Environment: Production >Reporter: Suhas >Priority: Critical > Attachments: stderr (7), stdout (12) > > > We see timeout in kafka in production cluster and Kafka is running on > DC/OS(MESOS) > and below are the errors > *+Exception 1: This from application logs+* > 2019-10-07 10:01:59 Error: java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > ie-lrx-audit-evt-3: 30030 ms has passed since batch creation plus linger time > *+Exception 2:This from application logs+* > {"eventTime":"2019-10-07 08:20:43.265", "logType":"ERROR", "stackMessage" : > "java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > ie-lrx-audit-evt-3: 30028 ms has passed since batch creation plus linger > time", "stackTrace" : > *+Exception (from log) We see this logs on broker logs+* > [2019-10-10 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, > fetcherId=0] Error sending fetch request (sessionId=919177392, epoch=INITIAL) > to node 2: java.io.IOException: Connection to 2 was disconnected before the > response was read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 > 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] > Error sending fetch request (sessionId=919177392, epoch=INITIAL) to node 2: > java.io.IOException: Connection to 2 was disconnected before the response was > read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 06:32:10,849] > WARN [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] Error in response > for fetch request (type=FetchRequest, replicaId=4, maxWait=500, minBytes=1, > maxBytes=10485760, fetchData=\{ie-lrx-rxer-audit-evt-0=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > mft-hdfs-landing-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), dca-audit-evt-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), > it-sou-audit-evt-7=(offset=94819, logStartOffset=94819, maxBytes=1048576, > currentLeaderEpoch=Optional[100]), intg-ie-lrx-rxer-audit-evt-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[78]), > prod-pipelines-errors-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[117]), __consumer_offsets-36=(offset=3, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > panel-data-change-evt-4=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), gdcp-notification-evt-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > data-transfer-change-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), __consumer_offsets-11=(offset=15, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), > dca-heartbeat-evt-2=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[105]), ukwhs-error-topic-1=(offset=8, > logStartOffset=8, maxBytes=1048576, currentLeaderEpoch=Optional[105]), > intg-ie-lrx-audit-evt-4=(offset=21, logStartOffset=21, maxBytes=1048576, > currentLeaderEpoch=Optional[74]), __consumer_offsets-16=(offset=11329814, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > __consumer_offsets-31=(offset=3472033, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[107]), ukpai-hdfs-evt-1=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[107]), > mft-pflow-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), ukwhs-hdfs-landing-evt-01-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[105]), > it-sou-audit-evt-2=(offset=490084, logStartOffset=490084, maxBytes=1048576, > currentLeaderEpoch=Optional[105]), ie-lrx-pat-audit-evt-4=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104])}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=919177392, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)java.io.IOException: > Connection to 2 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java
[jira] [Commented] (KAFKA-10715) Support Kafka connect converter for AVRO
[ https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315110#comment-17315110 ] Randall Hauch commented on KAFKA-10715: --- There are multiple Converter implementations outside of Kafka, and IMO there is no need for Kafka to own and maintain its own version of these when those other existing implementations can easily be used by simply installing them. This is similar to how the Kafka project provides only example Connector implementations. See the [rejected alternatives of KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767#KIP26AddKafkaConnectframeworkfordataimport/export-Maintainconnectorsintheprojectalongwithframework], which introduced the Connect framework (with Converters). Therefore, I'm going to close this. > Support Kafka connect converter for AVRO > > > Key: KAFKA-10715 > URL: https://issues.apache.org/jira/browse/KAFKA-10715 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ravindranath Kakarla >Priority: Minor > Original Estimate: 72h > Remaining Estimate: 72h > > I want to add support for Avro data format converter to Kafka Connect. Right > now, Kafka connect supports [JSON > converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro > is a commonly used data format with Kafka, it will be great to have support > for it. > > Confluent Schema Registry libraries have > [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java] > for it. The code seems to be pretty generic and can be used directly with > Kafka connect without schema registry. They are also licensed under Apache > 2.0. > > Can they be copied to this repository and made available for all users of > Kafka Connect? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10715) Support Kafka connect converter for AVRO
[ https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-10715. --- Resolution: Won't Do > Support Kafka connect converter for AVRO > > > Key: KAFKA-10715 > URL: https://issues.apache.org/jira/browse/KAFKA-10715 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ravindranath Kakarla >Priority: Minor > Original Estimate: 72h > Remaining Estimate: 72h > > I want to add support for Avro data format converter to Kafka Connect. Right > now, Kafka connect supports [JSON > converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro > is a commonly used data format with Kafka, it will be great to have support > for it. > > Confluent Schema Registry libraries have > [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java] > for it. The code seems to be pretty generic and can be used directly with > Kafka connect without schema registry. They are also licensed under Apache > 2.0. > > Can they be copied to this repository and made available for all users of > Kafka Connect? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9988) Connect incorrectly logs that task has failed when one takes too long to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9988: - Labels: newbie (was: ) > Connect incorrectly logs that task has failed when one takes too long to > shutdown > - > > Key: KAFKA-9988 > URL: https://issues.apache.org/jira/browse/KAFKA-9988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, > 2.5.1 >Reporter: Sanjana Kaundinya >Priority: Major > Labels: newbie > > If the OffsetStorageReader is closed while the task is trying to shutdown, > and the task is trying to access the offsets from the OffsetStorageReader, > then we see the following in the logs. > {code:java} > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader > closed while attempting to read offsets. This is likely because the task was > been scheduled to stop but has taken longer than the graceful shutdown period > to do so. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) > ... 14 more > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is > being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > This is a bit misleading, because the task is already on its way of being > shutdown, and doesn't actually need manual intervention to be restarted. We > can see that as later on in the logs we see that it throws another > unrecoverable exception. > {code:java} > [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > If we know a task is on its way of shutting down, we should not throw a > ConnectException and instead log a warning so that we don't log false > negatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter
cmccabe commented on a change in pull request #10455: URL: https://github.com/apache/kafka/pull/10455#discussion_r607330220 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java ## @@ -47,7 +47,8 @@ public long backoff(long attempts) { } double exp = Math.min(attempts, this.expMax); double term = initialInterval * Math.pow(multiplier, exp); -double randomFactor = ThreadLocalRandom.current().nextDouble(1 - jitter, 1 + jitter); +double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 : Review comment: `MIN_NORMAL` is 2^-1022, though. So it certainly wouldn't affect someone setting jitter = 0.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark
[ https://issues.apache.org/jira/browse/KAFKA-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12619: Description: KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that a newly elected leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the LeaderChange record which is written to the log as soon as the leader gets elected. We have this check implemented here: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. However, the check needs to be a strict inequality since the epoch start offset does not reflect the LeaderChange record itself. In other words, the check is off by one. was: KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that the leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the LeaderChange record which is written to the log as soon as the leader gets elected. We have this check implemented here: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. However, the check needs to be a strict inequality since the epoch start offset does not reflect the LeaderChange record itself. In other words, the check is off by one. > Ensure LeaderChange message is committed before initializing high watermark > --- > > Key: KAFKA-12619 > URL: https://issues.apache.org/jira/browse/KAFKA-12619 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > KIP-595 describes an extra condition on commitment here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. > In order to ensure that a newly elected leader's committed entries cannot > get lost, it must commit one record from its own epoch. This guarantees that > its latest entry is larger (in terms of epoch/offset) than any previously > written record which ensures that any future leader must also include it. > This is the purpose of the LeaderChange record which is written to the log as > soon as the leader gets elected. > We have this check implemented here: > https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. > However, the check needs to be a strict inequality since the epoch start > offset does not reflect the LeaderChange record itself. In other words, the > check is off by one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark
Jason Gustafson created KAFKA-12619: --- Summary: Ensure LeaderChange message is committed before initializing high watermark Key: KAFKA-12619 URL: https://issues.apache.org/jira/browse/KAFKA-12619 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that the leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the LeaderChange record which is written to the log as soon as the leader gets elected. We have this check implemented here: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. However, the check needs to be a strict inequality since the epoch start offset does not reflect the LeaderChange record itself. In other words, the check is off by one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito
[ https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7438: --- Summary: Replace EasyMock and PowerMock with Mockito (was: Replace EasyMock and PowerMock with Mockito in the clients module) > Replace EasyMock and PowerMock with Mockito > --- > > Key: KAFKA-7438 > URL: https://issues.apache.org/jira/browse/KAFKA-7438 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > > Development of EasyMock and PowerMock has stagnated while Mockito continues > to be actively developed. With the new Java cadence, it's a problem to depend > on libraries that do bytecode generation and are not actively maintained. In > addition, Mockito is also easier to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito
[ https://issues.apache.org/jira/browse/KAFKA-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315087#comment-17315087 ] Ismael Juma commented on KAFKA-12618: - Also see https://issues.apache.org/jira/browse/KAFKA-7438 > Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito > - > > Key: KAFKA-12618 > URL: https://issues.apache.org/jira/browse/KAFKA-12618 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Minor > > [This > commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d] > introduced changes that have Partition calling getLog when there is no topic > ID associated to the Partition. In this case, getLog will use a default > argument. EasyMock (a Java framework) does not play well with scala's default > arguments. For now, we are manually creating a partition and associating it > in the initializeLogAndTopicId method. But a better long term solution is to > use Mockito which better supports default arguments. > It would be good to convert all EasyMocks over to mockito as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito in the clients module
[ https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7438: --- Summary: Replace EasyMock and PowerMock with Mockito in the clients module (was: Replace EasyMock and PowerMock with Mockito) > Replace EasyMock and PowerMock with Mockito in the clients module > - > > Key: KAFKA-7438 > URL: https://issues.apache.org/jira/browse/KAFKA-7438 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > > Development of EasyMock and PowerMock has stagnated while Mockito continues > to be actively developed. With the new Java cadence, it's a problem to depend > on libraries that do bytecode generation and are not actively maintained. In > addition, Mockito is also easier to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito
Justine Olshan created KAFKA-12618: -- Summary: Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito Key: KAFKA-12618 URL: https://issues.apache.org/jira/browse/KAFKA-12618 Project: Kafka Issue Type: Task Reporter: Justine Olshan [This commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d] introduced changes that have Partition calling getLog when there is no topic ID associated to the Partition. In this case, getLog will use a default argument. EasyMock (a Java framework) does not play well with scala's default arguments. For now, we are manually creating a partition and associating it in the initializeLogAndTopicId method. But a better long term solution is to use Mockito which better supports default arguments. It would be good to convert all EasyMocks over to mockito as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter
mumrah commented on a change in pull request #10455: URL: https://github.com/apache/kafka/pull/10455#discussion_r607313601 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java ## @@ -47,7 +47,8 @@ public long backoff(long attempts) { } double exp = Math.min(attempts, this.expMax); double term = initialInterval * Math.pow(multiplier, exp); -double randomFactor = ThreadLocalRandom.current().nextDouble(1 - jitter, 1 + jitter); +double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 : Review comment: I had to look this constant up :) Can we just make it check if the jitter is equal to zero (or maybe `<=` zero)? A caller of this method setting jitter to something like 0.5 might be surprised that there is no jitter added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607290168 ## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ## @@ -212,4 +214,103 @@ class RaftClusterTest { cluster.close() } } + + @Test + def testClientQuotas(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(1). +setNumControllerNodes(1).build()).build() +try { + cluster.format() + cluster.startup() + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING, +"Broker never made it to RUNNING state.") + val admin = Admin.create(cluster.clientProperties()) + try { +val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava) +var filter = ClientQuotaFilter.containsOnly( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +def alterThenDescribe(entity: ClientQuotaEntity, + quotas: Seq[ClientQuotaAlteration.Op], + filter: ClientQuotaFilter, + expectCount: Int): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { + val alterResult = admin.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, quotas.asJava)).asJava) + try { +alterResult.all().get() + } catch { +case t: Throwable => fail("AlterClientQuotas request failed", t) + } + + def describeOrFail(filter: ClientQuotaFilter): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { +try { + admin.describeClientQuotas(filter).entities().get() +} catch { + case t: Throwable => fail("DescribeClientQuotas request failed", t) +} + } + + val (describeResult, ok) = TestUtils.computeUntilTrue(describeOrFail(filter)) { +results => results.getOrDefault(entity, java.util.Collections.emptyMap[String, java.lang.Double]()).size() == expectCount + } + assertTrue(ok, "Broker never saw new client quotas") + describeResult +} + +var describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), filter, 1) +assertEquals(0.99, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.97), + new ClientQuotaAlteration.Op("producer_byte_rate", 1), + new ClientQuotaAlteration.Op("consumer_byte_rate", 10001) +), filter, 3) +assertEquals(0.97, describeResult.get(entity).get("request_percentage"), 1e-6) +assertEquals(1.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) +assertEquals(10001.0, describeResult.get(entity).get("consumer_byte_rate"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.95), + new ClientQuotaAlteration.Op("producer_byte_rate", null), + new ClientQuotaAlteration.Op("consumer_byte_rate", null) +), filter, 1) +assertEquals(0.95, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0) + +describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), filter, 1) +assertEquals(.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) + +// Add another quota for a different entity with same user part +val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", "client-id" -> "some-client").asJava) +filter = ClientQuotaFilter.containsOnly( + List( +ClientQuotaFilterComponent.ofEntity("user", "testkit"), +ClientQuotaFilterComponent.ofEntity("client-id", "some-client"), + ).asJava) +describeResult = alterThenDescribe(entity2, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), filter, 1) +assertEquals(9998.0, describeResult.get(entity2).get("producer_byte_rate"), 1e-6) + +// non-strict match +filter = ClientQuotaFilter.contains( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +val (describeResult2, ok) = TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) { + results => results.size() == 2 +} +assertTrue(ok, "Broker never saw two client quotas") Review comment: Yea let me clean this up -- This is an automated messag
[GitHub] [kafka] mageshn commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
mageshn commented on a change in pull request #10475: URL: https://github.com/apache/kafka/pull/10475#discussion_r607285210 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java ## @@ -61,12 +61,24 @@ public Plugins(Map props) { delegatingLoader.initLoaders(); } +public Plugins(Map props, ClassLoader parent) { Review comment: Is this primarily added for testing purpose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607281769 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala ## @@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag return } -// Update the cache -quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove) - // Convert the value to an appropriate Option for the quota manager val newValue = if (quotaRecord.remove()) { None } else { Some(quotaRecord.value).map(_.toInt) } connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue) + +// Update the cache Review comment: Good question. I could swear there was a review comment suggesting this, but I can't seem to find it. I believe I moved this to align it with handleUserClientQuota where we update the underlying quota manager and then update the cache. I don't have a strong opinion on which thing happens first, but they should both probably be the same. Thinking about it more, we might want to be more defensive when calling to the quota managers. If the quota manager cannot be updated, I think we should still update the cache since that reflects the true state of the quota according to the metadata log. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607278548 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala ## @@ -122,6 +122,14 @@ class ClientQuotaCache { entityFilters.put(entityType, entityMatch) } +// Special case for non-strict empty filter, match everything Review comment: It should short-circuit and return an empty map on [L134](https://github.com/apache/kafka/pull/10254/files#diff-99bc678b86ad25d99b32bc192428120d2ff3f3d478159e2c353f4f5346b43718R133-R135) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607274751 ## File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java ## @@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) { (BeforeTestExecutionCallback) context -> { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). -setNumKip500BrokerNodes(clusterConfig.numBrokers()). +setNumBrokerNodes(clusterConfig.numBrokers()). Review comment: Not sure, it must have auto-formatted when i renamed to `setNumBrokerNodes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown
jsancio commented on a change in pull request #10468: URL: https://github.com/apache/kafka/pull/10468#discussion_r607271981 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { assertFutureThrows(shutdownFuture, TimeoutException.class); } +@Test +public void testLeaderGracefulShutdownOnClose() throws Exception { +int localId = 0; +int otherNodeId = 1; +int lingerMs = 50; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withAppendLingerMs(lingerMs) +.build(); + +context.becomeLeader(); +assertEquals(OptionalInt.of(localId), context.currentLeader()); +assertEquals(1L, context.log.endOffset().offset); + +int epoch = context.currentEpoch(); +assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a"))); + +context.client.poll(); +assertEquals(OptionalLong.of(lingerMs), context.messageQueue.lastPollTimeoutMs()); + +context.time.sleep(20); + +// client closed now. +context.client.close(); + +// Flag for accepting appends should be toggled to false. +assertFalse(context.client.canAcceptAppends()); + +// acceptAppends flag set to false so no writes should be accepted by the Leader now. +assertNull(context.client.scheduleAppend(epoch, singletonList("b"))); + +// The leader should trigger a flush for whatever batches are present in the BatchAccumulator +assertEquals(2L, context.log.endOffset().offset); + +// Now shutdown + +// We should still be running until we have had a chance to send EndQuorumEpoch +assertTrue(context.client.isShuttingDown()); +assertTrue(context.client.isRunning()); + +// Send EndQuorumEpoch request to the other voter +context.pollUntilRequest(); +assertTrue(context.client.isShuttingDown()); +assertTrue(context.client.isRunning()); +context.assertSentEndQuorumEpochRequest(1, otherNodeId); + +// We should still be able to handle vote requests during graceful shutdown +// in order to help the new leader get elected +context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 1L)); +context.client.poll(); +context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true); + Review comment: Okay, thanks! I have limited time at the moment. I'll try to look at it this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607270847 ## File path: metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java ## @@ -170,8 +170,10 @@ private void alterClientQuotaEntity( } }); -outputRecords.addAll(newRecords); -outputResults.put(entity, ApiError.NONE); +// Only add the records to outputRecords if there were no errors Review comment: This is a good fix. However, I think it would be better just to return immediately after setting the error, rather than waiting until the end of the function. That's consistent with how we handle errors at the top of this function, and in other manager classes. It makes it clear that only one error can be set for each entity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #10445: KAFKA-12548; Propagate record error messages to application
hachikuji merged pull request #10445: URL: https://github.com/apache/kafka/pull/10445 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607267726 ## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ## @@ -212,4 +214,103 @@ class RaftClusterTest { cluster.close() } } + + @Test + def testClientQuotas(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(1). +setNumControllerNodes(1).build()).build() +try { + cluster.format() + cluster.startup() + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING, +"Broker never made it to RUNNING state.") + val admin = Admin.create(cluster.clientProperties()) + try { +val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava) +var filter = ClientQuotaFilter.containsOnly( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +def alterThenDescribe(entity: ClientQuotaEntity, + quotas: Seq[ClientQuotaAlteration.Op], + filter: ClientQuotaFilter, + expectCount: Int): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { + val alterResult = admin.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, quotas.asJava)).asJava) + try { +alterResult.all().get() + } catch { +case t: Throwable => fail("AlterClientQuotas request failed", t) + } + + def describeOrFail(filter: ClientQuotaFilter): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { +try { + admin.describeClientQuotas(filter).entities().get() +} catch { + case t: Throwable => fail("DescribeClientQuotas request failed", t) +} + } + + val (describeResult, ok) = TestUtils.computeUntilTrue(describeOrFail(filter)) { +results => results.getOrDefault(entity, java.util.Collections.emptyMap[String, java.lang.Double]()).size() == expectCount + } + assertTrue(ok, "Broker never saw new client quotas") + describeResult +} + +var describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), filter, 1) +assertEquals(0.99, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.97), + new ClientQuotaAlteration.Op("producer_byte_rate", 1), + new ClientQuotaAlteration.Op("consumer_byte_rate", 10001) +), filter, 3) +assertEquals(0.97, describeResult.get(entity).get("request_percentage"), 1e-6) +assertEquals(1.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) +assertEquals(10001.0, describeResult.get(entity).get("consumer_byte_rate"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.95), + new ClientQuotaAlteration.Op("producer_byte_rate", null), + new ClientQuotaAlteration.Op("consumer_byte_rate", null) +), filter, 1) +assertEquals(0.95, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0) + +describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), filter, 1) +assertEquals(.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) + +// Add another quota for a different entity with same user part +val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", "client-id" -> "some-client").asJava) +filter = ClientQuotaFilter.containsOnly( + List( +ClientQuotaFilterComponent.ofEntity("user", "testkit"), +ClientQuotaFilterComponent.ofEntity("client-id", "some-client"), + ).asJava) +describeResult = alterThenDescribe(entity2, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), filter, 1) +assertEquals(9998.0, describeResult.get(entity2).get("producer_byte_rate"), 1e-6) + +// non-strict match +filter = ClientQuotaFilter.contains( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +val (describeResult2, ok) = TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) { + results => results.size() == 2 +} +assertTrue(ok, "Broker never saw two client quotas") Review comment: The "ok" check feels a little clunky here. Plus if this fa
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162 ## File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java ## @@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) { (BeforeTestExecutionCallback) context -> { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). -setNumKip500BrokerNodes(clusterConfig.numBrokers()). +setNumBrokerNodes(clusterConfig.numBrokers()). Review comment: indentation seems a little weird here. why would this be indented more than the following line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162 ## File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java ## @@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) { (BeforeTestExecutionCallback) context -> { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). -setNumKip500BrokerNodes(clusterConfig.numBrokers()). +setNumBrokerNodes(clusterConfig.numBrokers()). Review comment: indentation seems a little weird here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607263869 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala ## @@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag return } -// Update the cache -quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove) - // Convert the value to an appropriate Option for the quota manager val newValue = if (quotaRecord.remove()) { None } else { Some(quotaRecord.value).map(_.toInt) } connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue) + +// Update the cache Review comment: What's the purpose of moving this code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12548) Invalid record error message is not getting sent to application
[ https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12548. - Resolution: Fixed > Invalid record error message is not getting sent to application > --- > > Key: KAFKA-12548 > URL: https://issues.apache.org/jira/browse/KAFKA-12548 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > The ProduceResponse includes a nice record error message when we return > INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the > user never gets a chance to see it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12548) Invalid record error message is not getting sent to application
[ https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12548: Fix Version/s: 3.0.0 > Invalid record error message is not getting sent to application > --- > > Key: KAFKA-12548 > URL: https://issues.apache.org/jira/browse/KAFKA-12548 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > The ProduceResponse includes a nice record error message when we return > INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the > user never gets a chance to see it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607263189 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala ## @@ -122,6 +122,14 @@ class ClientQuotaCache { entityFilters.put(entityType, entityMatch) } +// Special case for non-strict empty filter, match everything Review comment: What happens if we have an empty filter list and strict mode set? It looks like an exception is thrown-- that doesn't seem correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
junrao commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r606343187 ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicate it is not yet copied successfully. So, these segments will not be Review comment: indicate => indicates Ditto in a few other places. ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicate it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + *
[GitHub] [kafka] cmccabe commented on pull request #10366: KAFKA-12467: Add controller-side snapshot generation
cmccabe commented on pull request #10366: URL: https://github.com/apache/kafka/pull/10366#issuecomment-813567129 Fix spotbugs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
hachikuji commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r607232953 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -355,24 +373,29 @@ private void fireHandleResign(int epoch) { } @Override -public void initialize() throws IOException { -quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); +public void initialize() { +try { +quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); -long currentTimeMs = time.milliseconds(); -if (quorum.isLeader()) { -throw new IllegalStateException("Voter cannot initialize as a Leader"); -} else if (quorum.isCandidate()) { -onBecomeCandidate(currentTimeMs); -} else if (quorum.isFollower()) { -onBecomeFollower(currentTimeMs); -} +long currentTimeMs = time.milliseconds(); +if (quorum.isLeader()) { +throw new IllegalStateException("Voter cannot initialize as a Leader"); +} else if (quorum.isCandidate()) { +onBecomeCandidate(currentTimeMs); +} else if (quorum.isFollower()) { +onBecomeFollower(currentTimeMs); +} -// When there is only a single voter, become candidate immediately -if (quorum.isVoter() -&& quorum.remoteVoters().isEmpty() -&& !quorum.isLeader() -&& !quorum.isCandidate()) { -transitionToCandidate(currentTimeMs); +// When there is only a single voter, become candidate immediately +if (quorum.isVoter() +&& quorum.remoteVoters().isEmpty() +&& !quorum.isLeader() Review comment: Since we're in here already, this check is not needed since we already ruled it out above. ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -68,20 +70,65 @@ public synchronized void increment() { @Override public synchronized void handleCommit(BatchReader reader) { try { -int initialValue = this.committed; +int initialValue = committed; while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), batch.baseOffset()); for (Integer value : batch.records()) { -if (value != this.committed + 1) { -throw new AssertionError("Expected next committed value to be " + -(this.committed + 1) + ", but instead found " + value + " on node " + nodeId); +if (value != committed + 1) { +throw new AssertionError( +String.format( +"Expected next committed value to be %s, but instead found %s on node %s", +committed + 1, +value, +nodeId +) +); } -this.committed = value; +committed = value; } + +nextReadOffset = batch.lastOffset() + 1; +readEpoch = batch.epoch(); } log.debug("Counter incremented from {} to {}", initialValue, committed); + +if (lastSnapshotEndOffset + 10 < nextReadOffset) { Review comment: Looks like we are trying to do snapshots every 10 records. We could probably get rid of `lastSnapshotEndOffset` and use `committed % 10` or something like that. It may also be useful to be able to control the frequency of snapshots with a parameter. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -326,6 +336,14 @@ private void updateListenersProgress(List listenerContexts, lon } } +private Optional> latestSnapshot() { +return log.latestSnapshotId().flatMap(snapshoId -> { Review comment: nit: missing t in snapshotId ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long currentTimeMs) { return false; } -private void maybeUpdateOldestSnapshotId() { -log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot); +private void maybeUpdateEarliestSnapshotId() { Review comment: Hmm, I guess I still see this the other way around. Why would the raft client care about updating the log start offset if not to delete old snapshots? What would that even mean outside the context of snapshot deletion? ## File p
[GitHub] [kafka] ijuma commented on a change in pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity
ijuma commented on a change in pull request #10393: URL: https://github.com/apache/kafka/pull/10393#discussion_r607233215 ## File path: raft/src/main/java/org/apache/kafka/raft/CandidateState.java ## @@ -235,6 +240,15 @@ public int epoch() { return highWatermark; } +@Override +public boolean canGrantVote(int candidateId, boolean isLogUpToDate) { +// Still reject vote request even candidateId = localId, Although the candidate votes for +// itself, this vote is implicit and not "granted". +log.debug("Rejecting vote request from candidate {} since we are already candidate in epoch {}", +candidateId, epoch); Review comment: As a general rule, methods like this should not log IMO. The calling method should log instead. That is, good to avoid the side effect from the "check" operation. It seems like it was like that before this PR. What was the motivation for changing it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ncliang commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
ncliang commented on a change in pull request #10475: URL: https://github.com/apache/kafka/pull/10475#discussion_r607232615 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java ## @@ -342,6 +343,47 @@ public void newPluginsShouldConfigureWithPluginClassLoader() { assertPluginClassLoaderAlwaysActive(samples); } +@Test +public void pluginClassLoaderReadVersionFromResource() { +TestPlugins.assertAvailable(); + +Map pluginProps = new HashMap<>(); +pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, +TestPlugins.pluginPath().stream() +.filter(s -> s.contains("read-version-from-resource-v1")) +.collect(Collectors.joining())); +plugins = new Plugins(pluginProps); + +Converter converter = plugins.newPlugin( +TestPlugins.READ_VERSION_FROM_RESOURCE, +new AbstractConfig(new ConfigDef(), Collections.emptyMap()), +Converter.class +); +assertEquals("1.0.0", +new String(converter.fromConnectData(null, null, null))); +PluginClassLoader pluginClassLoader = plugins.delegatingLoader() +.pluginClassLoader(TestPlugins.READ_VERSION_FROM_RESOURCE); +assertNotNull(pluginClassLoader); + + +// Re-initialize Plugins object with plugin class loader in the class loader tree. This is +// to simulate the situation where jars exist on both system classpath and plugin path. +pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, +TestPlugins.pluginPath().stream() +.filter(s -> s.contains("read-version-from-resource-v2")) +.collect(Collectors.joining())); +plugins = new Plugins(pluginProps, pluginClassLoader); Review comment: I agree that having the additional testcases would be valuable. However, I do think that adding the resource directly to the app classloader makes the test much less readable. I like the idea of constructing and using a plain `URLClassLoader` as the parent. While it still requires the additional constructor to be exposed on `Plugins`, we do already expose the parent parameter on `DelegatingClassLoader` for specifically the flexibility of controlling the parent loader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should
[ https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315000#comment-17315000 ] John Roesler commented on KAFKA-12602: -- Thank you, [~jmclean] , There weren't too many BSD or MIT licenses, so I've gone ahead and just copied the exact license files from each one of those dependencies. Thanks for the feedback. I also added notices regarding the provenance and copyright of PureJavaCrc32C and Murmur3 to our NOTICE file. I will create two follow-up tickets, one to revisit the NOTICE file, and another to add an automated check to the release script to make sure that the our license file doesn't start to rot again in the future. > The LICENSE and NOTICE files don't list everything they should > -- > > Key: KAFKA-12602 > URL: https://issues.apache.org/jira/browse/KAFKA-12602 > Project: Kafka > Issue Type: Bug >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [~jmclean] raised this on the mailing list: > [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E] > > We need to make the license file match what we are actually shipping in > source and binary distributions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients
hachikuji commented on a change in pull request #10142: URL: https://github.com/apache/kafka/pull/10142#discussion_r607224516 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager( .setTopics(topicsToCreate) ) -channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler { +val requestCompletionHandler = new ControllerRequestCompletionHandler { override def onTimeout(): Unit = { debug(s"Auto topic creation timed out for ${creatableTopics.keys}.") clearInflightRequests(creatableTopics) } override def onComplete(response: ClientResponse): Unit = { -debug(s"Auto topic creation completed for ${creatableTopics.keys}.") +debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.") clearInflightRequests(creatableTopics) } -}) +} + +val channelManager = this.channelManager.getOrElse { + throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.") +} + +val request = metadataRequestContext.map { context => + val requestVersion = +channelManager.controllerApiVersions() match { + case None => +// We will rely on the Metadata request to be retried in the case +// that the latest version is not usable by the controller. +ApiKeys.CREATE_TOPICS.latestVersion() + case Some(nodeApiVersions) => +nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS) +} + + // Borrow client information such as client id and correlation id from the original request, + // in order to correlate the create request with the original metadata request. + val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, +requestVersion, +context.clientId, +context.correlationId) + ForwardingManager.buildEnvelopeRequest(context, + createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader)) Review comment: @dengziming It's not a bad idea. We could even simplify it a little since the api key and version can be obtained from the request. I tend to agree that this is kind of a niche usage though, so I'm not sure it calls for the generality. Perhaps you could submit a follow-up once this is merged and we can see what it looks like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org