[GitHub] [kafka] dajac commented on a change in pull request #10438: KAFKA-12579: Remove various deprecated clients classes/methods for 3.0

2021-04-05 Thread GitBox


dajac commented on a change in pull request #10438:
URL: https://github.com/apache/kafka/pull/10438#discussion_r607567743



##
File path: docs/upgrade.html
##
@@ -27,18 +27,30 @@ Notable changes in 3
 or updating the application not to use internal classes.
 The Streams API removed all deprecated APIs that were deprecated in 
version 2.5.0 or earlier.
 For a complete list of removed APIs compare the detailed Kafka Streams 
upgrade notes.
-The deprecated Scala Authorizer, 
SimpleAclAuthorizer and related classes have been removed. Please 
use the Java Authorizer
-and AclAuthorizer instead.
-The deprecated Metric#value() method was removed (https://issues.apache.org/jira/browse/KAFKA-12573";>KAFKA-12573).
-Deprecated security classes were removed: 
PrincipalBuilder, DefaultPrincipalBuilder and 
ResourceFilter.
-Furthermore, deprecated constants and constructors were removed from 
SslConfigs, SaslConfigs,
-AclBinding and AclBindingFilter.
-The deprecated Admin.electedPreferredLeaders() methods 
were removed. Please use Admin.electLeaders instead.
-The deprecated kafka-preferred-replica-election command 
line tool was removed. Please use kafka-leader-election 
instead.
-The deprecated ConfigEntry constructor was removed (https://issues.apache.org/jira/browse/KAFKA-12577";>KAFKA-12577).
-Please use the remaining public constructor instead.
-The deprecated config value default for the client config 
client.dns.lookup has been removed. In the unlikely
-event that you set this config explicitly, we recommend leaving the 
config unset (use_all_dns_ips is used by default).
+A number of deprecated classes and methods have been removed in the 
clients, core and tools modules:

Review comment:
   nit: Should we say `classes, methods and tools` (or command) because of 
`kafka-preferred-replica-election`?

##
File path: clients/src/main/java/org/apache/kafka/common/MessageFormatter.java
##
@@ -34,33 +33,21 @@
  */
 public interface MessageFormatter extends Configurable, Closeable {
 
-/**
- * Initialises the MessageFormatter
- * @param props Properties to configure the formatter
- * @deprecated Use {@link #configure(Map)} instead, this method is for 
backward compatibility with the older Formatter interface
- */
-@Deprecated

Review comment:
   If we remove this one, we could also remove the 
`kafka.common.MessageFormatter` trait in the core module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread Ben Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315284#comment-17315284
 ] 

Ben Chen commented on KAFKA-12492:
--

Do we need to run [local 
tests|https://github.com/apache/kafka/blob/trunk/README.md] and build trigger 
for this task as described here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes] ?

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607562925



##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * 
+ * Segment in this state indicate it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicate it is getting deleted. That means, it is not 
available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * 
+ * 
+ *
+ * 
+ * 
+ * 
+ * 
+ * 
+ * COPY_SEGMENT_STARTED
+ * COPY_SEGMENT_FINISHED
+ * DELETE_SEGMENT_STARTED
+ * DELETE_SEGMENT_FINISHED
+ * 
+ * 
+ * 
+ * 
+ * remoteLogSegmentMetadata(int leaderEpoch, long offset)
+ * No
+ * Yes
+ * No
+ * No
+ * 
+ * 
+ * listRemoteLogSegments (int leaderEpoch)
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * listAllRemoteLogSegments()
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * 
+ * 
+ * 
+ */
+public class RemoteLogMetadataCache {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+// It contains all the segment-id to metadata mappings which did not reach 
the terminal state viz DELETE_SEGMENT_FINISHED.
+private final ConcurrentMap 
idToSegmentMetadata
+= new ConcurrentHashMap<>();
+
+// It contains leader epoch to the respective entry containing the state.
+private final ConcurrentMap 
leaderEpochEntries = new ConcurrentHashMap<>();
+
+/**
+ * Returns {@link RemoteLogSegmentMetadata} if it exists for the given 
leader-epoch containing the offset and with
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns 
{@link Optional#empty()}.
+ *
+ * @param leaderEpoch leader epoch for the given offset
+ * @param offset  offset
+ * @return the requested remote log segment metadata if it exists.
+ */
+public Optional remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
+RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
+
+i

[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607561788



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageException.java
##
@@ -28,9 +27,14 @@ public RemoteStorageException(final String message) {
 super(message);
 }
 
-public RemoteStorageException(final String message,
-  final Throwable cause) {
+public RemoteStorageException(final String message, final Throwable cause) 
{
 super(message, cause);
 }
 
+public RemoteStorageException() {

Review comment:
   Not really needed for now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607559905



##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * 
+ * Segment in this state indicate it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicate it is getting deleted. That means, it is not 
available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * 
+ * 
+ *
+ * 
+ * 
+ * 
+ * 
+ * 
+ * COPY_SEGMENT_STARTED
+ * COPY_SEGMENT_FINISHED
+ * DELETE_SEGMENT_STARTED
+ * DELETE_SEGMENT_FINISHED
+ * 
+ * 
+ * 
+ * 
+ * remoteLogSegmentMetadata(int leaderEpoch, long offset)
+ * No
+ * Yes
+ * No
+ * No
+ * 
+ * 
+ * listRemoteLogSegments (int leaderEpoch)
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * listAllRemoteLogSegments()
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * 
+ * 
+ * 
+ */
+public class RemoteLogMetadataCache {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+// It contains all the segment-id to metadata mappings which did not reach 
the terminal state viz DELETE_SEGMENT_FINISHED.
+private final ConcurrentMap 
idToSegmentMetadata
+= new ConcurrentHashMap<>();
+
+// It contains leader epoch to the respective entry containing the state.
+private final ConcurrentMap 
leaderEpochEntries = new ConcurrentHashMap<>();
+
+/**
+ * Returns {@link RemoteLogSegmentMetadata} if it exists for the given 
leader-epoch containing the offset and with
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns 
{@link Optional#empty()}.
+ *
+ * @param leaderEpoch leader epoch for the given offset
+ * @param offset  offset
+ * @return the requested remote log segment metadata if it exists.
+ */
+public Optional remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
+RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
+
+i

[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607556778



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##
@@ -21,14 +21,16 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
- * It indicates the state of the remote log segment. This will be based on the 
action executed on this
+ * This enum indicates the state of the remote log segment. This will be based 
on the action executed on this
  * segment by the remote log service implementation.
  * 
- * It goes through the below state transitions.
+ * It goes through the below state transitions. Self transition is treated as 
valid. This allows updating with the
+ * same state in case of retries and failover.

Review comment:
   Looks like autoformatter changed it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cc13ny opened a new pull request #10486: KAFKA-12492: Fix the formatting of example RocksDBConfigSetter

2021-04-05 Thread GitBox


cc13ny opened a new pull request #10486:
URL: https://github.com/apache/kafka/pull/10486


   Fix the formatting of example RocksDBConfigSetter due to the un-arranged 
spaces within  tag.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10338: KAFKA-10251: wait for consumer rebalance completed before consuming records

2021-04-05 Thread GitBox


showuon commented on pull request #10338:
URL: https://github.com/apache/kafka/pull/10338#issuecomment-813859513


   @apurvam @hachikuji , call for review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607551710



##
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed 
by in-memory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
+
+private Map 
idToPartitionDeleteMetadata =
+new ConcurrentHashMap<>();
+
+private Map 
idToRemoteLogMetadataCache = new ConcurrentHashMap<>();
+
+@Override
+public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata)
+throws RemoteStorageException {
+log.debug("Adding remote log segment : [{}]", 
remoteLogSegmentMetadata);
+Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+
+idToRemoteLogMetadataCache
+.computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> 
new RemoteLogMetadataCache())
+.addCopyInProgressSegment(remoteLogSegmentMetadata);
+}
+
+@Override
+public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
+throws RemoteStorageException {
+log.debug("Updating remote log segment: [{}]", metadataUpdate);
+Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be 
null");
+
+
getRemoteLogMetadataCache(metadataUpdate.remoteLogSegmentId().topicIdPartition())
+.updateRemoteLogSegmentMetadata(metadataUpdate);
+}
+
+private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition 
topicIdPartition)
+throws RemoteResourceNotFoundException {
+RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+if (remoteLogMetadataCache == null) {
+throw new RemoteResourceNotFoundException("No existing metadata 
found for partition: " + topicIdPartition);
+}
+
+return remoteLogMetadataCache;
+}
+
+@Override
+public Optional 
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+   int 
epochForOffset,
+   long 
offset)
+throws RemoteStorageException {
+Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
+
+return 
getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset,
 offset);
+}
+
+@Override
+public Optional highestOffsetForEpoch(TopicIdPartition 
topicIdPartition,
+int leaderEpoch) throws 
RemoteStorageException {
+Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
+
+return 
getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
+}
+
+@Override
+public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
+throws RemoteStorageException {
+log.debug("Adding delete state with: [{}]", 
remotePartitionDeleteMetadata);
+Objects.requireNonNull(remotePartitionDeleteMetadata, 
"remotePartitionDeleteMetadata can not be null");
+
+TopicIdPartition topicIdPartition = 
remotePartitionDeleteMetadata.topic

[jira] [Commented] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-04-05 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315268#comment-17315268
 ] 

Luke Chen commented on KAFKA-12598:
---

[~rajinisiva...@gmail.com]  [~rsivaram] , could you help confirm that in 
ConfigCommand, we cannot remove the _deprecated_ "--zookeeper" option yet in 
V3.0 because we still need to dependent on "--zookeeper" option to allow 
dynamic broker configs to be configured before starting broker? (i.e. 
KAFKA-6805)

Thank you.

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607546803



##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * 
+ * Segment in this state indicate it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicate it is getting deleted. That means, it is not 
available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * 
+ * 
+ *
+ * 
+ * 
+ * 
+ * 
+ * 
+ * COPY_SEGMENT_STARTED
+ * COPY_SEGMENT_FINISHED
+ * DELETE_SEGMENT_STARTED
+ * DELETE_SEGMENT_FINISHED
+ * 
+ * 
+ * 
+ * 
+ * remoteLogSegmentMetadata(int leaderEpoch, long offset)
+ * No
+ * Yes
+ * No
+ * No
+ * 
+ * 
+ * listRemoteLogSegments (int leaderEpoch)
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * listAllRemoteLogSegments()
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * 
+ * 
+ * 
+ */
+public class RemoteLogMetadataCache {

Review comment:
   No. javadoc is generated for clients module with the package 
`/org/apache/kafka/server/log/remote/storage/ `. But this class is in 
`remote-storage` module.

##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissi

[GitHub] [kafka] ijuma merged pull request #10479: MINOR: Jenkinsfile's `post` needs `agent` to be set

2021-04-05 Thread GitBox


ijuma merged pull request #10479:
URL: https://github.com/apache/kafka/pull/10479


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on pull request #10302: KAFKA-7785: move internal DefaultPartitionGrouper

2021-04-05 Thread GitBox


highluck commented on pull request #10302:
URL: https://github.com/apache/kafka/pull/10302#issuecomment-813824311


   @guozhangwang 
   May I ask for a merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on pull request #9861: MINOR: Modify unnecessary access specifiers

2021-04-05 Thread GitBox


highluck commented on pull request #9861:
URL: https://github.com/apache/kafka/pull/9861#issuecomment-813823981


   @chia7712 
   May I ask for a review of this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on a change in pull request #9851: KAFKA-10769 Remove JoinGroupRequest#containsValidPattern as it is dup…

2021-04-05 Thread GitBox


highluck commented on a change in pull request #9851:
URL: https://github.com/apache/kafka/pull/9851#discussion_r607505049



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
##
@@ -58,17 +57,7 @@ public void shouldThrowOnInvalidGroupInstanceIds() {
 }
 }
 }
-
-@Test
-public void shouldRecognizeInvalidCharactersInGroupInstanceIds() {
-char[] invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', 
'*', '?', ' ', '\t', '\r', '\n', '='};
-
-for (char c : invalidChars) {
-String instanceId = "Is " + c + "illegal";
-assertFalse(JoinGroupRequest.containsValidPattern(instanceId));
-}
-}
-
+

Review comment:
   @chia7712 
   thanks i updated code
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10485: MINOR: Enable scala/java joint compilation consistently for `core` module

2021-04-05 Thread GitBox


ijuma opened a new pull request #10485:
URL: https://github.com/apache/kafka/pull/10485


   We were doing it only for test files previously.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9851: KAFKA-10769 Remove JoinGroupRequest#containsValidPattern as it is dup…

2021-04-05 Thread GitBox


chia7712 commented on a change in pull request #9851:
URL: https://github.com/apache/kafka/pull/9851#discussion_r607490193



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
##
@@ -58,17 +57,7 @@ public void shouldThrowOnInvalidGroupInstanceIds() {
 }
 }
 }
-
-@Test
-public void shouldRecognizeInvalidCharactersInGroupInstanceIds() {
-char[] invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', 
'*', '?', ' ', '\t', '\r', '\n', '='};
-
-for (char c : invalidChars) {
-String instanceId = "Is " + c + "illegal";
-assertFalse(JoinGroupRequest.containsValidPattern(instanceId));
-}
-}
-
+

Review comment:
   please remove those indents.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10446: MINOR: [ConfigEntry.class] add 'type' to 'toString' and 'hashCode'

2021-04-05 Thread GitBox


chia7712 commented on a change in pull request #10446:
URL: https://github.com/apache/kafka/pull/10446#discussion_r607486103



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
##
@@ -167,6 +169,8 @@ public int hashCode() {
 result = prime * result + (isReadOnly ? 1 : 0);
 result = prime * result + source.hashCode();
 result = prime * result + synonyms.hashCode();
+result = prime * result + type.hashCode();
+result = prime * result + documentation.hashCode();

Review comment:
   nice point! will copy that!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-8924.
---
Resolution: Duplicate

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315217#comment-17315217
 ] 

A. Sophie Blee-Goldman commented on KAFKA-8924:
---

Can we close this as a duplicate? We have KIP-633 currently under voting which 
should improve things, we can track the progress from here with 
https://issues.apache.org/jira/browse/KAFKA-8613

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8613) Set default grace period to 0

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-8613:
-

Assignee: A. Sophie Blee-Goldman

> Set default grace period to 0
> -
>
> Key: KAFKA-8613
> URL: https://issues.apache.org/jira/browse/KAFKA-8613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Currently, the grace period is set to retention time if the grace period is 
> not specified explicitly. The reason for setting the default grace period to 
> retention time was backward compatibility. Topologies that were implemented 
> before the introduction of the grace period, added late arriving records to a 
> window as long as the window existed, i.e., as long as its retention time was 
> not elapsed.  
> This unintuitive default grace period has already caused confusion among 
> users.
> For the next major release, we should set the default grace period to 
> {{Duration.ZERO}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-6603.
---
Resolution: Fixed

> Kafka streams off heap memory usage does not match expected values from 
> configuration
> -
>
> Key: KAFKA-6603
> URL: https://issues.apache.org/jira/browse/KAFKA-6603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Igor Calabria
>Priority: Minor
>
> Hi, I have a simple aggregation pipeline that's backed by the default state 
> store(rocksdb). The pipeline works fine except that off heap the memory usage 
> is way higher than expected. Following the 
> [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  has some effect(memory usage is reduced) but the values don't match at all. 
> The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
> config looks like this
> {code:java}
> tableConfig.setCacheIndexAndFilterBlocks(true);
> tableConfig.setBlockCacheSize(1048576); //1MB
> tableConfig.setBlockSize(16 * 1024); // 16KB
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024); // 8KB{code}
> To estimate memory usage, I'm using this formula  
> {noformat}
> (block_cache_size + write_buffer_size * write_buffer_number) * segments * 
> partitions{noformat}
> Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
> off heap memory usage should be about 76MB. What I'm seeing in production is 
> upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
> compaction threads, this seems a bit high (especially when the disk usage for 
> all files is just 1GB) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-8165.
---
Resolution: Fixed

> Streams task causes Out Of Memory after connection issues and store 
> restoration
> ---
>
> Key: KAFKA-8165
> URL: https://issues.apache.org/jira/browse/KAFKA-8165
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
> Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window 
> store, 4 KV stores. 
> Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 
> application instance, 2 threads per instance.
> Kafka 2.1, Kafka Streams 2.1
> Amazon Linux.
> Scala application, on Docker based on openJdk9. 
>Reporter: Di Campo
>Priority: Major
>
> Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
> (largely stateful) application has been consuming ~160 messages per second at 
> a sustained rate for several hours. 
> However it started having connection issues to the brokers. 
> {code:java}
> Connection to node 3 (/172.31.36.118:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> Also it began showing a lot of these errors: 
> {code:java}
> WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
>  groupId=stream-processor] 1 partitions have leader brokers without a 
> matching listener, including [broker-2-health-check-0] 
> (org.apache.kafka.clients.NetworkClient){code}
> In fact, the _health-check_ topic is in the broker but not consumed by this 
> topology or used in any way by the Streams application (it is just broker 
> healthcheck). It does not complain about topics that are actually consumed by 
> the topology. 
> Some time after these errors (that appear at a rate of 24 appearances per 
> second during ~5 minutes), then the following logs appear: 
> {code:java}
> [2019-03-27 15:14:47,709] WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
>  groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> In between 6 and then 3 lines of "Connection could not be established" error 
> messages, 3 of these ones slipped in: 
> {code:java}
> [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
> partition 15 total records to be restored 17 
> (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener){code}
>  
>  ... one for each different KV store I have (I still have another KV that 
> does not appear, and a WindowedStore store that also does not appear). 
>  Then I finally see "Restoration Complete" (using a logging 
> ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
> seems it may be fine now to restart the processing.
> Three minutes later, some events get processed, and I see an OOM error:  
> {code:java}
> java.lang.OutOfMemoryError: GC overhead limit exceeded{code}
>  
> ... so given that it usually allows to process during hours under same 
> circumstances, I'm wondering whether there is some memory leak in the 
> connection resources or somewhere in the handling of this scenario.
> Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315209#comment-17315209
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12419:


Thanks -- wanted to make sure I understood why exactly the StreamsConfig was 
helpful to dependency injection. The KIP makes sense to me. Opened 
https://github.com/apache/kafka/pull/10484

> Remove Deprecated APIs of Kafka Streams in 3.0
> --
>
> Key: KAFKA-12419
> URL: https://issues.apache.org/jira/browse/KAFKA-12419
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, streams-test-utils
>Reporter: Guozhang Wang
>Assignee: Tomasz Nguyen
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Here's a list of deprecated APIs that we have accumulated in the past, we can 
> consider removing them in 3.0:
> * KIP-198: "--zookeeper" flag from StreamsResetter (1.0)
> * KIP-171: "–execute" flag from StreamsResetter (1.1)
> * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1)
> * KIP-251: overloaded "ProcessorContext#forward" (2.0)
> * KIP-276: "StreamsConfig#getConsumerConfig" (2.0)
> * KIP-319: "WindowBytesStoreSupplier#segments" (2.1)
> * KIP-321: "TopologyDescription.Source#topics" (2.1)
> * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1)
> * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1)
> * KIP-365/366: Implicit Scala Apis (2.1)
> * KIP-372: overloaded "KStream#groupBy" (2.1)
> * KIP-307: "Joined#named" (2.3)
> * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3)
> * KIP-429: "PartitionAssignor" interface (2.4)
> * KIP-470: "TopologyTestDriver#pipeInput" (2.4)
> * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4)
> * KIP-479: overloaded "KStream#join" (2.4)
> * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5)
> * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and 
> "KafkaStreams#store" (2.5)
> And here's a list of already filed JIRAs for removing deprecated APIs
> * KAFKA-10434
> * KAFKA-7785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #10484: MINOR: un-deprecate StreamsConfig overloads to support dependency injection

2021-04-05 Thread GitBox


ableegoldman opened a new pull request #10484:
URL: https://github.com/apache/kafka/pull/10484


   In [#5344](https://github.com/apache/kafka/pull/5344#issuecomment-413350338) 
it came to our attention that the StreamsConfig overloads of the KafkaStreams 
constructors are actually quite useful for dependency injection, providing a 
cleaner way to configure dependencies and better type safety. 
   
   We considered removing these deprecated overloads in the upcoming 3.0 
release, but decided against it for the above reasons. Since we no longer 
intend to remove these APIs it makes sense to drop the Deprecation entirely, so 
users can start or continue to use them without worry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-05 Thread GitBox


hachikuji commented on pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#issuecomment-813766742


   @chia7712 @dajac No rush, but when you have time, this is a continuation of 
the previous work which added `AdminApiDriver`. This patch contains 
`CoordinatorStrategy`, which is needed to lookup transaction coordinators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-05 Thread GitBox


hachikuji opened a new pull request #10483:
URL: https://github.com/apache/kafka/pull/10483


   This patch contains the `Admin` implementation of the `DescribeTransactions` 
APIs described in KIP-664: KIP-664: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity

2021-04-05 Thread GitBox


dengziming commented on a change in pull request #10393:
URL: https://github.com/apache/kafka/pull/10393#discussion_r607430517



##
File path: raft/src/main/java/org/apache/kafka/raft/CandidateState.java
##
@@ -235,6 +240,15 @@ public int epoch() {
 return highWatermark;
 }
 
+@Override
+public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+// Still reject vote request even candidateId = localId, Although the 
candidate votes for
+// itself, this vote is implicit and not "granted".
+log.debug("Rejecting vote request from candidate {} since we are 
already candidate in epoch {}",
+candidateId, epoch);

Review comment:
   @ijuma , we logged both vote result and the reject reason in 
`KafkaRaftClient` before this PR, but different `EpochState` have different 
reject reason, to reduce the cyclomatic complexity we moved the if-else out of 
`KafkaRaftClient`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10482: KAFKA-12499: add transaction timeout verification

2021-04-05 Thread GitBox


ableegoldman commented on a change in pull request #10482:
URL: https://github.com/apache/kafka/pull/10482#discussion_r607423468



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -106,7 +107,7 @@ public StreamsProducer(final StreamsConfig config,
 producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
 
 eosBetaProducerConfigs = null;
-
+verifyTransactionTimeoutCompatibility(producerConfigs, config);

Review comment:
   +1, we should fail-fast at the level of the KafkaStreams. Otherwise in 
eos-alpha we actually have to wait for all the individual StreamThreads to be 
created, started, complete a rebalance to get a task assignment, and _then_ 
start throwing IllegalArgumentException on each thread when it goes to create a 
task producer. (Can we also log an error in addition to the 
IllegalArgumentException?) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lamberken commented on pull request #10469: KAFKA-12611: Fix using random payload in ProducerPerformance incorrectly

2021-04-05 Thread GitBox


lamberken commented on pull request #10469:
URL: https://github.com/apache/kafka/pull/10469#issuecomment-813750487


   hi @mageshn @C0urante @gharris1727 , please review when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315186#comment-17315186
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12492:


Yeah, you need to check out the "*Kafka Code Repository*" and not just the 
"*Kafka Website Repository*". The whole setup and process with the docs is kind 
of confusing, I'll see if I can improve the docs/wiki to clear things up a bit. 
Thanks :)

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread Ben Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315185#comment-17315185
 ] 

Ben Chen commented on KAFKA-12492:
--

Got it. Thanks. I guess I missed something mentioned here: 
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10482: KAFKA-12499: add transaction timeout verification

2021-04-05 Thread GitBox


mjsax commented on a change in pull request #10482:
URL: https://github.com/apache/kafka/pull/10482#discussion_r607414956



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -106,7 +107,7 @@ public StreamsProducer(final StreamsConfig config,
 producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
 
 eosBetaProducerConfigs = null;
-
+verifyTransactionTimeoutCompatibility(producerConfigs, config);

Review comment:
   It might be better to do this check in `StreamsConfig` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315183#comment-17315183
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12492:


Thanks, I'll take a look. But note that you always need to submit a docs PR 
against the kafka-repo, while the kafka-site repo is optional. The reason being 
what I said above, ie during a release the docs from kafka get copied over to 
kafka-site. So if you only fix them in kafka-site but not in kafka, the fix 
will just get wiped out when the next release comes up. 

In general we usually do the kafka PR before the kafka-site PR, for that reason 
among others. But that's not a hard rule or anything -- my point is just that 
you need a PR for the kafka repo as well. 

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda closed pull request #9560: KAFKA-10345: Add ZK-notification based update for trust/key store paths

2021-04-05 Thread GitBox


abbccdda closed pull request #9560:
URL: https://github.com/apache/kafka/pull/9560


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #10482: KAFKA-12499: add transaction timeout verification

2021-04-05 Thread GitBox


abbccdda opened a new pull request #10482:
URL: https://github.com/apache/kafka/pull/10482


   This PR tries to add the check for transaction timeout for a comparison 
against commit interval of streams. If transaction timeout is smaller than 
commit interval, stream should crash and inform user to update their commit 
interval to match the given transaction timeout, or vise versa.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread Ben Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chen updated KAFKA-12492:
-
Comment: was deleted

(was: https://github.com/apache/kafka-site/pull/345/)

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread Ben Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315173#comment-17315173
 ] 

Ben Chen commented on KAFKA-12492:
--

https://github.com/apache/kafka-site/pull/345/

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread Ben Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315172#comment-17315172
 ] 

Ben Chen commented on KAFKA-12492:
--

[~ableegoldman] appreciate for the detailed explanation!

Here is the PR: https://github.com/apache/kafka-site/pull/345/

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315156#comment-17315156
 ] 

Ismael Juma commented on KAFKA-12574:
-

For 4.0, we can make eos-v2 the default and make it a non-issue for most 
people. Then this config will only be there for people who want the weaker 
semantics. :)

> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315153#comment-17315153
 ] 

Guozhang Wang commented on KAFKA-12574:
---

I'm fine with option 2) as well, to stay with `eos-v2` eventually. Honestly I 
think we should try to avoid such upgrade path ever in the future -- i.e. a 
rolling bounce with config change -- but I know that one should never say never 
:) So although we may not ever have a `eos-v3`, this is not a bad state to end 
with `eos-v2`.

> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607386832



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -319,6 +328,10 @@ public String name() {
 }
 
 @Override
-public void close() {}
+public void close() {
+if (accumulator != null) {

Review comment:
   QuorumState tests where I was passing in null for the accumulator to get 
the code to compile. I'll see about using a mock.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607383662



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List 
records) {
 return append(epoch, records, true);
 }
 
+@SuppressWarnings("unchecked")
 private Long append(int epoch, List records, boolean isAtomic) {
-BatchAccumulator accumulator = this.accumulator;
-if (accumulator == null) {
+BatchAccumulator accumulator;
+try {
+accumulator =  (BatchAccumulator) 
quorum.leaderStateOrThrow().accumulator();

Review comment:
   Removing the cast causes this...
   `Type Mismatch: Cannot convert from BatchAccumulator to 
BatchAccumulator `
   
   even though this is the signature of accumulator()
   `public BatchAccumulator accumulator()`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607381875



##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest {

Review comment:
   I confess it's a little annoying to see the generic type leak down to 
here, but I guess that's the price we have to pay. There's a big part of me 
that wants to remove the generic and let `ApiMessageAndVersion` be the only 
supported type. Anyway, that is fuel for a separate issue/discussion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607380961



##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest {
 private final int localId = 0;
 private final int epoch = 5;
 private final LogContext logContext = new LogContext();
 
-private LeaderState newLeaderState(
+private LeaderState newLeaderState(
 Set voters,
 long epochStartOffset
 ) {
-return new LeaderState(
+return new LeaderState<>(
 localId,
 epoch,
 epochStartOffset,
 voters,
 voters,
-logContext
+logContext,
+null

Review comment:
   Yeah, let's add an explicit `requireNonNull` in the constructor. Here we 
could potentially use a mock.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12294.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Consider using the forwarding mechanism for metadata auto topic creation
> 
>
> Key: KAFKA-12294
> URL: https://issues.apache.org/jira/browse/KAFKA-12294
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Boyang Chen
>Priority: Major
> Fix For: 3.0.0
>
>
> Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to 
> improve the topic creation auditing by forwarding the CreateTopicsRequest 
> inside Envelope for the given client. Details in 
> [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

2021-04-05 Thread GitBox


hachikuji merged pull request #10142:
URL: https://github.com/apache/kafka/pull/10142


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315144#comment-17315144
 ] 

Ismael Juma commented on KAFKA-12574:
-

{quote}Regarding the proposal, I thin users would find it very odd if we moved 
on to eos-v2 and then suddenly deprecated it and went back to just "eos" – 
makes it seem like there was a problem with eos-v2. I would be fine with just 
staying on eos-v2 though. For one thing it leaves the door open to further 
developments in eos that need to be gated by a config, eg eos-v3, if we ever 
have need for that again.
{quote}
+1

> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315142#comment-17315142
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12574:


> This is well documented in 
> https://kafka.apache.org/27/documentation/streams/upgrade-guide

Ah, you're right. I must have been on an older version of the docs -- I find 
Google often takes me to 2.4 docs for some reason. We need better SEO :/ (I 
filed a ticket for this already a while back)

Also not sure how I forgot about EosBetaUpgradeIntegrationTest after spending 
so much time trying to help fix it -- sorry for doubting our test coverage and 
docs.

Regarding the proposal, I thin users would find it very odd if we moved on to 
eos-v2 and then suddenly deprecated it and went back to just "eos" -- makes it 
seem like there was a problem with eos-v2. I would be fine with just staying on 
eos-v2 though. For one thing it leaves the door open to further developments in 
eos that need to be gated by a config, eg eos-v3, if we ever have need for that 
again. 


> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r607374397



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int,
*
* @param offsetTopicPartitionId The partition we are now leading
*/
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId")
-groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+  info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+  groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+  epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+} else {
+  warn(s"Ignored election as group coordinator for partition 
$offsetTopicPartitionId " +
+s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+}
   }
 
   /**
* Unload cached state for the given partition and stop handling requests 
for groups which map to it.
*
* @param offsetTopicPartitionId The partition we are no longer leading
*/
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId")
-groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: 
Option[Int]): Unit = {
+val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+if (currentEpoch.forall(currentEpoch => currentEpoch <= 
coordinatorEpoch.getOrElse(Int.MaxValue))) {
+  info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+  groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
onGroupUnloaded)
+  epochForPartitionId.remove(offsetTopicPartitionId)

Review comment:
   Hmm.. Why remove the epoch after resignation? It seems like it would be 
useful to keep tracking it. Maybe it's useful to distinguish the case where the 
replica is to be deleted?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -87,6 +87,8 @@ class GroupCoordinator(val brokerId: Int,
 
   private val isActive = new AtomicBoolean(false)
 
+  val epochForPartitionId = mutable.Map[Int, Int]()

Review comment:
   Does this need to be a concurrent collection? It does not look like we 
can count on a lock protecting `onElection` and `onResignation`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12607) Allow votes to be granted in resigned state

2021-04-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12607:
--
Labels: newbie++  (was: )

> Allow votes to be granted in resigned state
> ---
>
> Key: KAFKA-12607
> URL: https://issues.apache.org/jira/browse/KAFKA-12607
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>  Labels: newbie++
>
> When the leader is shutting down, it transitions to a resigned state. 
> Currently all votes are rejected in this state, but we should allow the 
> resigned leader to help a candidate get elected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315140#comment-17315140
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12492:


The kafka-site repo is what the actual, live docs are built from. That's why 
there are separate folders like 27, 26, etc -- these correspond to the docs for 
versions 2.7 and 2.6, and so on. You only need to submit a PR to the kafka-site 
repo if you want your change to show up immediately -- if you don't mind 
waiting for the next release, you can just open a PR to fix the docs in the 
kafka repo directly. Then, these will be copied over to the kafka-site repo and 
made live when the next version is released.

Obviously it would be ideal if we could fix this in all versions, but it's 
probably sufficient to just fix it going forward. The 2.8 release is actually 
going on at the moment, so I would recommend submitting a PR to the kafka repo 
for now. If we can get it merged before 2.8 is released, then we're good -- 
otherwise you can open a followup PR with the same fix in just the 28 
subdirectory of the kafka-site repo.

I'm not sure why you're getting a 403, I was able to setup a local apache 
server to test some docs but that was a while ago. Since it's just a fix of an 
existing formatting error, I wouldn't worry about testing it too much. As long 
as you can figure out why the formatting was messed up to begin with, and feel 
reasonably confident in your fix, then that's good enough. Remember, once the 
fix is in kafka-site it'll be live so you can just see what it looks like then. 
If something is still off, you can always submit a followup PR to fix it right 
away in kafka-site

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange

2021-04-05 Thread GitBox


hachikuji commented on pull request #10481:
URL: https://github.com/apache/kafka/pull/10481#issuecomment-813686724


   @guozhangwang Thanks for the quick comment. I did consider that. I can't say 
I had a particularly strong reason to reject it, but ultimately I convinced 
myself that modifying the existing check was good enough and probably simpler. 
The only invariant that we need to protect is that all records appended in the 
leader's epoch actually carry the right epoch tag. The refactor proposed here 
may even give us a stronger way to enforce the invariant: 
https://github.com/apache/kafka/pull/10480/files. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ewencp commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-05 Thread GitBox


ewencp commented on a change in pull request #10474:
URL: https://github.com/apache/kafka/pull/10474#discussion_r607362884



##
File path: LICENSE-binary
##
@@ -0,0 +1,602 @@
+
+ Apache License
+   Version 2.0, January 2004
+http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+  "License" shall mean the terms and conditions for use, reproduction,
+  and distribution as defined by Sections 1 through 9 of this document.
+
+  "Licensor" shall mean the copyright owner or entity authorized by
+  the copyright owner that is granting the License.
+
+  "Legal Entity" shall mean the union of the acting entity and all
+  other entities that control, are controlled by, or are under common
+  control with that entity. For the purposes of this definition,
+  "control" means (i) the power, direct or indirect, to cause the
+  direction or management of such entity, whether by contract or
+  otherwise, or (ii) ownership of fifty percent (50%) or more of the
+  outstanding shares, or (iii) beneficial ownership of such entity.
+
+  "You" (or "Your") shall mean an individual or Legal Entity
+  exercising permissions granted by this License.
+
+  "Source" form shall mean the preferred form for making modifications,
+  including but not limited to software source code, documentation
+  source, and configuration files.
+
+  "Object" form shall mean any form resulting from mechanical
+  transformation or translation of a Source form, including but
+  not limited to compiled object code, generated documentation,
+  and conversions to other media types.
+
+  "Work" shall mean the work of authorship, whether in Source or
+  Object form, made available under the License, as indicated by a
+  copyright notice that is included in or attached to the work
+  (an example is provided in the Appendix below).
+
+  "Derivative Works" shall mean any work, whether in Source or Object
+  form, that is based on (or derived from) the Work and for which the
+  editorial revisions, annotations, elaborations, or other modifications
+  represent, as a whole, an original work of authorship. For the purposes
+  of this License, Derivative Works shall not include works that remain
+  separable from, or merely link (or bind by name) to the interfaces of,
+  the Work and Derivative Works thereof.
+
+  "Contribution" shall mean any work of authorship, including
+  the original version of the Work and any modifications or additions
+  to that Work or Derivative Works thereof, that is intentionally
+  submitted to Licensor for inclusion in the Work by the copyright owner
+  or by an individual or Legal Entity authorized to submit on behalf of
+  the copyright owner. For the purposes of this definition, "submitted"
+  means any form of electronic, verbal, or written communication sent
+  to the Licensor or its representatives, including but not limited to
+  communication on electronic mailing lists, source code control systems,
+  and issue tracking systems that are managed by, or on behalf of, the
+  Licensor for the purpose of discussing and improving the Work, but
+  excluding communication that is conspicuously marked or otherwise
+  designated in writing by the copyright owner as "Not a Contribution."
+
+  "Contributor" shall mean Licensor and any individual or Legal Entity
+  on behalf of whom a Contribution has been received by Licensor and
+  subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+  this License, each Contributor hereby grants to You a perpetual,
+  worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+  copyright license to reproduce, prepare Derivative Works of,
+  publicly display, publicly perform, sublicense, and distribute the
+  Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+  this License, each Contributor hereby grants to You a perpetual,
+  worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+  (except as stated in this section) patent license to make, have made,
+  use, offer to sell, sell, import, and otherwise transfer the Work,
+  where such license applies only to those patent claims licensable
+  by such Contributor that are necessarily infringed by their
+  Contribution(s) alone or by combination of their Contribution(s)
+  with the Work to which such Contribution(s) was submitted. If You
+  institute patent litigation against any entity (including a
+  cross-claim or counterclaim in a lawsuit) alleging that the Work
+  or a Contribution

[GitHub] [kafka] guozhangwang commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange

2021-04-05 Thread GitBox


guozhangwang commented on pull request #10481:
URL: https://github.com/apache/kafka/pull/10481#issuecomment-813683249


   Nice catch! Regarding the fix, WDYT to just remember the `LeaderChange` 
record's offset and compare against it instead of the epoch start offset? I'm 
thinking if in the future there are any scenarios that can fill in some more 
records between these two, we are still immune to any future bugs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange

2021-04-05 Thread GitBox


hachikuji opened a new pull request #10481:
URL: https://github.com/apache/kafka/pull/10481


   KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that a newly elected leader's committed entries cannot get 
lost, it must commit one record from its own epoch. This guarantees that its 
latest entry is larger (in terms of epoch/offset) than any previously written 
record which ensures that any future leader must also include it. This is the 
purpose of the `LeaderChange` record which is written to the log as soon as the 
leader gets elected.
   
   Although we had this check implemented, it was off by one. We only ensured 
that replication reached the epoch start offset, which does not reflect the 
appended `LeaderChange` record. This patch fixes the check and clarifies the 
point of the check. The rest of the patch is just fixing up test cases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607353807



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -48,13 +49,16 @@
 private final Set grantingVoters = new HashSet<>();
 private final Logger log;
 
+private final BatchAccumulator accumulator;
+
 protected LeaderState(
 int localId,
 int epoch,
 long epochStartOffset,
 Set voters,
 Set grantingVoters,
-LogContext logContext
+LogContext logContext,
+BatchAccumulator accumulator

Review comment:
   I would keep `LogContext` as the last argument.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List 
records) {
 return append(epoch, records, true);
 }
 
+@SuppressWarnings("unchecked")
 private Long append(int epoch, List records, boolean isAtomic) {
-BatchAccumulator accumulator = this.accumulator;
-if (accumulator == null) {
+BatchAccumulator accumulator;
+try {
+accumulator =  (BatchAccumulator) 
quorum.leaderStateOrThrow().accumulator();

Review comment:
   I think you should be able to remove this cast.

##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest {
 private final int localId = 0;
 private final int epoch = 5;
 private final LogContext logContext = new LogContext();
 
-private LeaderState newLeaderState(
+private LeaderState newLeaderState(
 Set voters,
 long epochStartOffset
 ) {
-return new LeaderState(
+return new LeaderState<>(
 localId,
 epoch,
 epochStartOffset,
 voters,
 voters,
-logContext
+logContext,
+null

Review comment:
   I would not pass a `null` and add a test checking that this field is 
returned correctly.

##
File path: raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
##
@@ -269,7 +269,7 @@ public void testCandidateToLeader() throws IOException {
 assertTrue(state.isCandidate());
 assertEquals(1, state.epoch());
 
-state.transitionToLeader(0L);
+state.transitionToLeader(0L, null);

Review comment:
   Again, the code in `KafkaRaftClient` assumes that this field cannot be 
`null`.

##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -319,6 +328,10 @@ public String name() {
 }
 
 @Override
-public void close() {}
+public void close() {
+if (accumulator != null) {

Review comment:
   When would accumulator be null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-05 Thread GitBox


ableegoldman commented on a change in pull request #10474:
URL: https://github.com/apache/kafka/pull/10474#discussion_r607355497



##
File path: licenses/DWTFYWTPL
##
@@ -0,0 +1,14 @@
+DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE

Review comment:
   😂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rsomu commented on pull request #4040: KAFKA-6324: Change LogSegment.delete to deleteIfExists and harden log recovery

2021-04-05 Thread GitBox


rsomu commented on pull request #4040:
URL: https://github.com/apache/kafka/pull/4040#issuecomment-813671433


   Is this fix supposed to avoid the[ NFS silly 
renames](https://sbg.technology/2018/07/10/kafka-nfs/) issue?  I am currently 
testing Kafka 2.7 on NFS filesystem and encountered the same error message when 
deleting a topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr opened a new pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


dielhennr opened a new pull request #10480:
URL: https://github.com/apache/kafka/pull/10480


   The KafkaRaftClient has a field for the BatchAccumulator that is only used 
and set when it is the leader. In other cases, leader specific information was 
stored in LeaderState. In a recent change EpochState, which LeaderState 
implements, was changed to be a Closable. QuorumState makes sure to always 
close the previous state before transitioning to the next state. This redesign 
was used to move the BatchAccumulator to the LeaderState and simplify some of 
the handling in KafkaRaftClient.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-6985) Error connection between cluster node

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6985:
-
Component/s: (was: KafkaConnect)
 core

> Error connection between cluster node
> -
>
> Key: KAFKA-6985
> URL: https://issues.apache.org/jira/browse/KAFKA-6985
> Project: Kafka
>  Issue Type: Bug
>  Components: core
> Environment: Centos-7
>Reporter: Ranjeet Ranjan
>Priority: Major
>
> Hi Have setup multi-node Kafka cluster but getting an error while connecting 
> one node to another although there is an issue with firewall or port. I am 
> able to telnet 
> WARN [ReplicaFetcherThread-0-1], Error in fetch 
> Kafka.server.ReplicaFetcherThread$FetchRequest@8395951 
> (Kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to Kafka-1:9092 (id: 1 rack: null) failed
>  
> {code:java}
>  
> at 
> kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
> at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
> at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
> at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {code}
> Here you go server.properties
> Node:1
>  
> {code:java}
> # Server Basics #
> # The id of the broker. This must be set to a unique integer for each broker.
> broker.id=1
> # Switch to enable topic deletion or not, default value is false
> delete.topic.enable=true
> # Socket Server Settings 
> #
> listeners=PLAINTEXT://kafka-1:9092
> advertised.listeners=PLAINTEXT://kafka-1:9092
> #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> # The number of threads handling network requests
> num.network.threads=3
> # The number of threads doing disk I/O
> num.io.threads=8
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=102400
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
> # The maximum size of a request that the socket server will accept 
> (protection against OOM)
> socket.request.max.bytes=104857600
> # Log Basics #
> # A comma seperated list of directories under which to store log files
> log.dirs=/var/log/kafka
> # The default number of log partitions per topic. More partitions allow 
> greater
> # parallelism for consumption, but this will also result in more files across
> # the brokers.
> num.partitions=1
> # The number of threads per data directory to be used for log recovery at 
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data dirs 
> located in RAID array.
> num.recovery.threads.per.data.dir=1
> # Log Retention Policy 
> #
> # The minimum age of a log file to be eligible for deletion due to age
> log.retention.hours=48
> # A size-based retention policy for logs. Segments are pruned from the log as 
> long as the remaining
> # segments don't drop below log.retention.bytes. Functions independently of 
> log.retention.hours.
> log.retention.bytes=1073741824
> # The maximum size of a log segment file. When this size is reached a new log 
> segment will be created.
> log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be 
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=30
> # Zookeeper #
> # root directory for all kafka znodes.
> zookeeper.connect=10.130.82.28:2181
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
> {code}
>  
>  
> Node-2
> {code:java}
> # Server Basics #
> # The id of the broker. This must be set to a unique integer for each broker.
> broker.id=2
> # Switch to enable topic deletion or not, default value is false
> delete.topic.enable=true
> # Socket Server Settings 
> #
> listeners=PLAINTEXT://kafka-2:9092
> advertised.listeners=PLAINTEXT://kafka-2:9092
> #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> # Th

[jira] [Resolved] (KAFKA-8551) Comments for connectors() in Herder interface

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8551.
--
Resolution: Won't Fix

Marking as won't fix, since the details are insufficient to try to address.

> Comments for connectors() in Herder interface 
> --
>
> Key: KAFKA-8551
> URL: https://issues.apache.org/jira/browse/KAFKA-8551
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
>Reporter: Luying Liu
>Priority: Major
>
> There are mistakes in the comments for connectors() in Herder interface.  The 
> mistakes are in the  file 
> [kafka|https://github.com/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime]/[src|https://github.com/apache/kafka/tree/trunk/connect/runtime/src]/[main|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main]/[java|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java]/[org|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org]/[apache|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache]/[kafka|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime]/*Herder.java.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8664.
--
Resolution: Won't Fix

The reported problem is for a connector implementation that is not owned by the 
Apache Kafka project. Please report the issue with the provider of the 
connector.

> non-JSON format messages when streaming data from Kafka to Mongo
> 
>
> Key: KAFKA-8664
> URL: https://issues.apache.org/jira/browse/KAFKA-8664
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Vu Le
>Priority: Major
> Attachments: MongoSinkConnector.properties, 
> log_error_when_stream_data_not_a_json_format.txt
>
>
> Hi team,
> I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB 
> Kafka Connector 
> ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md])
> However, if I send a non-JSON format message the Connector died. Please see 
> the log file for details.
> My config file:
> {code:java}
> name=mongo-sink
> topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
> tasks.max=1
> key.ignore=true
> # Specific global MongoDB Sink Connector configuration
> connection.uri=mongodb://localhost:27017
> database=test_kafka
> collection=transaction
> max.num.retries=3
> retries.defer.timeout=5000
> type.name=kafka-connect
> key.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=false
> value.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter.schemas.enable=false
> {code}
> I have 2 separated questions:  
>  # how to ignore the message which is non-json format?
>  # how to defined a default-key for this kind of message (for example: abc -> 
> \{ "non-json": "abc" } )
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8867) Kafka Connect JDBC fails to create PostgreSQL table with default boolean value in schema

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8867.
--
Resolution: Won't Fix

The reported problem is for the Confluent JDBC source/sink connector, and 
should be reported via that connector's GitHub repository issues.

> Kafka Connect JDBC fails to create PostgreSQL table with default boolean 
> value in schema
> 
>
> Key: KAFKA-8867
> URL: https://issues.apache.org/jira/browse/KAFKA-8867
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Tudor
>Priority: Major
>
> The `CREATE TABLE ..` statement generated for JDBC sink connectors when 
> configured with `auto.create: true` generates field declarations that do not 
> conform to allowed PostgreSQL syntax when considering fields of type boolean 
> with default values.
> Example record value Avro schema:
> {code:java}
> {
>   "namespace": "com.test.avro.schema.v1",
>   "type": "record",
>   "name": "SomeEvent",
>   "fields": [
> {
>   "name": "boolean_field",
>   "type": "boolean",
>   "default": false
> }
>   ]
> }
> {code}
> The connector task fails with:  
> {code:java}
> ERROR WorkerSinkTask{id=test-events-sink-0} RetriableException from SinkTask: 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:551)
> org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: 
> org.postgresql.util.PSQLException: ERROR: column "boolean_field" is of type 
> boolean but default expression is of type integer
>   Hint: You will need to rewrite or cast the expression.
>   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748){code}
>  
> The generated SQL statement is: 
> {code:java}
> CREATE TABLE "test_data" ("boolean_field" BOOLEAN DEFAULT 0){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #10479: MINOR: Jenkinsfile's `post` needs `agent` to be set

2021-04-05 Thread GitBox


ijuma opened a new pull request #10479:
URL: https://github.com/apache/kafka/pull/10479


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8961) Unable to create secure JDBC connection through Kafka Connect

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8961.
--
Resolution: Won't Fix

This is not a problem of the Connect framework, and is instead an issue with 
the connector implementation – or more likely the _installation_ of the 
connector in the user's environment.

> Unable to create secure JDBC connection through Kafka Connect
> -
>
> Key: KAFKA-8961
> URL: https://issues.apache.org/jira/browse/KAFKA-8961
> Project: Kafka
>  Issue Type: Bug
>  Components: build, clients, KafkaConnect, network
>Affects Versions: 2.2.1
>Reporter: Monika Bainsala
>Priority: Major
>
> As per below article for enabling JDBC secure connection, we can use updated 
> URL parameter while calling the create connector REST API.
> Exampl:
> jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=YES)(FAILOVER=YES)(ADDRESS=(PROTOCOL=tcp)(HOST=X)(PORT=1520)))(CONNECT_DATA=(SERVICE_NAME=XXAP)));EncryptionLevel=requested;EncryptionTypes=RC4_256;DataIntegrityLevel=requested;DataIntegrityTypes=MD5"
>  
> But this approach is not working currently, kindly help in resolving this 
> issue.
>  
> Reference :
> [https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9017) We see timeout in kafka in production cluster

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9017:
-
Component/s: (was: KafkaConnect)
 core

> We see timeout in kafka in production cluster
> -
>
> Key: KAFKA-9017
> URL: https://issues.apache.org/jira/browse/KAFKA-9017
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
> Environment: Production
>Reporter: Suhas
>Priority: Critical
> Attachments: stderr (7), stdout (12)
>
>
> We see timeout in kafka in production cluster and Kafka is running on 
> DC/OS(MESOS)
> and below are the errors 
> *+Exception 1: This from application logs+*
> 2019-10-07 10:01:59 Error: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> ie-lrx-audit-evt-3: 30030 ms has passed since batch creation plus linger time
> *+Exception 2:This from application logs+*
>  {"eventTime":"2019-10-07 08:20:43.265", "logType":"ERROR", "stackMessage" : 
> "java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> ie-lrx-audit-evt-3: 30028 ms has passed since batch creation plus linger 
> time", "stackTrace" : 
> *+Exception (from log) We see this logs on broker logs+*
> [2019-10-10 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, 
> fetcherId=0] Error sending fetch request (sessionId=919177392, epoch=INITIAL) 
> to node 2: java.io.IOException: Connection to 2 was disconnected before the 
> response was read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 
> 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] 
> Error sending fetch request (sessionId=919177392, epoch=INITIAL) to node 2: 
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 06:32:10,849] 
> WARN [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] Error in response 
> for fetch request (type=FetchRequest, replicaId=4, maxWait=500, minBytes=1, 
> maxBytes=10485760, fetchData=\{ie-lrx-rxer-audit-evt-0=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> mft-hdfs-landing-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), dca-audit-evt-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), 
> it-sou-audit-evt-7=(offset=94819, logStartOffset=94819, maxBytes=1048576, 
> currentLeaderEpoch=Optional[100]), intg-ie-lrx-rxer-audit-evt-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[78]), 
> prod-pipelines-errors-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[117]), __consumer_offsets-36=(offset=3, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> panel-data-change-evt-4=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), gdcp-notification-evt-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> data-transfer-change-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), __consumer_offsets-11=(offset=15, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), 
> dca-heartbeat-evt-2=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[105]), ukwhs-error-topic-1=(offset=8, 
> logStartOffset=8, maxBytes=1048576, currentLeaderEpoch=Optional[105]), 
> intg-ie-lrx-audit-evt-4=(offset=21, logStartOffset=21, maxBytes=1048576, 
> currentLeaderEpoch=Optional[74]), __consumer_offsets-16=(offset=11329814, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> __consumer_offsets-31=(offset=3472033, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[107]), ukpai-hdfs-evt-1=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[107]), 
> mft-pflow-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), ukwhs-hdfs-landing-evt-01-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[105]), 
> it-sou-audit-evt-2=(offset=490084, logStartOffset=490084, maxBytes=1048576, 
> currentLeaderEpoch=Optional[105]), ie-lrx-pat-audit-evt-4=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=919177392, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)java.io.IOException: 
> Connection to 2 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java

[jira] [Commented] (KAFKA-10715) Support Kafka connect converter for AVRO

2021-04-05 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315110#comment-17315110
 ] 

Randall Hauch commented on KAFKA-10715:
---

There are multiple Converter implementations outside of Kafka, and IMO there is 
no need for Kafka to own and maintain its own version of these when those other 
existing implementations can easily be used by simply installing them.

This is similar to how the Kafka project provides only example Connector 
implementations. See the [rejected alternatives of 
KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767#KIP26AddKafkaConnectframeworkfordataimport/export-Maintainconnectorsintheprojectalongwithframework],
 which introduced the Connect framework (with Converters).

Therefore, I'm going to close this.

> Support Kafka connect converter for AVRO
> 
>
> Key: KAFKA-10715
> URL: https://issues.apache.org/jira/browse/KAFKA-10715
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ravindranath Kakarla
>Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> I want to add support for Avro data format converter to Kafka Connect. Right 
> now, Kafka connect supports [JSON 
> converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro 
> is a commonly used data format with Kafka, it will be great to have support 
> for it. 
>  
> Confluent Schema Registry libraries have 
> [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java]
>  for it. The code seems to be pretty generic and can be used directly with 
> Kafka connect without schema registry. They are also licensed under Apache 
> 2.0.
>  
> Can they be copied to this repository and made available for all users of 
> Kafka Connect?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10715) Support Kafka connect converter for AVRO

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-10715.
---
Resolution: Won't Do

> Support Kafka connect converter for AVRO
> 
>
> Key: KAFKA-10715
> URL: https://issues.apache.org/jira/browse/KAFKA-10715
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ravindranath Kakarla
>Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> I want to add support for Avro data format converter to Kafka Connect. Right 
> now, Kafka connect supports [JSON 
> converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro 
> is a commonly used data format with Kafka, it will be great to have support 
> for it. 
>  
> Confluent Schema Registry libraries have 
> [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java]
>  for it. The code seems to be pretty generic and can be used directly with 
> Kafka connect without schema registry. They are also licensed under Apache 
> 2.0.
>  
> Can they be copied to this repository and made available for all users of 
> Kafka Connect?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9988) Connect incorrectly logs that task has failed when one takes too long to shutdown

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9988:
-
Labels: newbie  (was: )

> Connect incorrectly logs that task has failed when one takes too long to 
> shutdown
> -
>
> Key: KAFKA-9988
> URL: https://issues.apache.org/jira/browse/KAFKA-9988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, 
> 2.5.1
>Reporter: Sanjana Kaundinya
>Priority: Major
>  Labels: newbie
>
> If the OffsetStorageReader is closed while the task is trying to shutdown, 
> and the task is trying to access the offsets from the OffsetStorageReader, 
> then we see the following in the logs.
> {code:java}
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
> closed while attempting to read offsets. This is likely because the task was 
> been scheduled to stop but has taken longer than the graceful shutdown period 
> to do so.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> ... 14 more
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> This is a bit misleading, because the task is already on its way of being 
> shutdown, and doesn't actually need manual intervention to be restarted. We 
> can see that as later on in the logs we see that it throws another 
> unrecoverable exception.
> {code:java}
> [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> If we know a task is on its way of shutting down, we should not throw a 
> ConnectException and instead log a warning so that we don't log false 
> negatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10455:
URL: https://github.com/apache/kafka/pull/10455#discussion_r607330220



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -47,7 +47,8 @@ public long backoff(long attempts) {
 }
 double exp = Math.min(attempts, this.expMax);
 double term = initialInterval * Math.pow(multiplier, exp);
-double randomFactor = ThreadLocalRandom.current().nextDouble(1 - 
jitter, 1 + jitter);
+double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 :

Review comment:
   `MIN_NORMAL` is 2^-1022, though.  So it certainly wouldn't affect 
someone setting jitter = 0.5.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12619:

Description: 
KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that a newly elected leader's committed entries cannot get 
lost, it must commit one record from its own epoch. This guarantees that its 
latest entry is larger (in terms of epoch/offset) than any previously written 
record which ensures that any future leader must also include it. This is the 
purpose of the LeaderChange record which is written to the log as soon as the 
leader gets elected.

We have this check implemented here: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
 However, the check needs to be a strict inequality since the epoch start 
offset does not reflect the LeaderChange record itself. In other words, the 
check is off by one.

  was:
KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that the leader's committed entries cannot get lost, it 
must commit one record from its own epoch. This guarantees that its latest 
entry is larger (in terms of epoch/offset) than any previously written record 
which ensures that any future leader must also include it. This is the purpose 
of the LeaderChange record which is written to the log as soon as the leader 
gets elected.

We have this check implemented here: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
 However, the check needs to be a strict inequality since the epoch start 
offset does not reflect the LeaderChange record itself. In other words, the 
check is off by one.


> Ensure LeaderChange message is committed before initializing high watermark
> ---
>
> Key: KAFKA-12619
> URL: https://issues.apache.org/jira/browse/KAFKA-12619
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> KIP-595 describes an extra condition on commitment here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
>  In order to ensure that a newly elected leader's committed entries cannot 
> get lost, it must commit one record from its own epoch. This guarantees that 
> its latest entry is larger (in terms of epoch/offset) than any previously 
> written record which ensures that any future leader must also include it. 
> This is the purpose of the LeaderChange record which is written to the log as 
> soon as the leader gets elected.
> We have this check implemented here: 
> https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
>  However, the check needs to be a strict inequality since the epoch start 
> offset does not reflect the LeaderChange record itself. In other words, the 
> check is off by one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark

2021-04-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12619:
---

 Summary: Ensure LeaderChange message is committed before 
initializing high watermark
 Key: KAFKA-12619
 URL: https://issues.apache.org/jira/browse/KAFKA-12619
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that the leader's committed entries cannot get lost, it 
must commit one record from its own epoch. This guarantees that its latest 
entry is larger (in terms of epoch/offset) than any previously written record 
which ensures that any future leader must also include it. This is the purpose 
of the LeaderChange record which is written to the log as soon as the leader 
gets elected.

We have this check implemented here: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
 However, the check needs to be a strict inequality since the epoch start 
offset does not reflect the LeaderChange record itself. In other words, the 
check is off by one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito

2021-04-05 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7438:
---
Summary: Replace EasyMock and PowerMock with Mockito  (was: Replace 
EasyMock and PowerMock with Mockito in the clients module)

> Replace EasyMock and PowerMock with Mockito
> ---
>
> Key: KAFKA-7438
> URL: https://issues.apache.org/jira/browse/KAFKA-7438
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> Development of EasyMock and PowerMock has stagnated while Mockito continues 
> to be actively developed. With the new Java cadence, it's a problem to depend 
> on libraries that do bytecode generation and are not actively maintained. In 
> addition, Mockito is also easier to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito

2021-04-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315087#comment-17315087
 ] 

Ismael Juma commented on KAFKA-12618:
-

Also see https://issues.apache.org/jira/browse/KAFKA-7438

> Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito
> -
>
> Key: KAFKA-12618
> URL: https://issues.apache.org/jira/browse/KAFKA-12618
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Minor
>
> [This 
> commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d]
>  introduced changes that have Partition calling getLog when there is no topic 
> ID associated to the Partition. In this case, getLog will use a default 
> argument. EasyMock (a Java framework) does not play well with scala's default 
> arguments. For now, we are manually creating a partition and associating it 
> in the initializeLogAndTopicId method. But a better long term solution is to 
> use Mockito which better supports default arguments.
> It would be good to convert all EasyMocks over to mockito as well. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito in the clients module

2021-04-05 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7438:
---
Summary: Replace EasyMock and PowerMock with Mockito in the clients module  
(was: Replace EasyMock and PowerMock with Mockito)

> Replace EasyMock and PowerMock with Mockito in the clients module
> -
>
> Key: KAFKA-7438
> URL: https://issues.apache.org/jira/browse/KAFKA-7438
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> Development of EasyMock and PowerMock has stagnated while Mockito continues 
> to be actively developed. With the new Java cadence, it's a problem to depend 
> on libraries that do bytecode generation and are not actively maintained. In 
> addition, Mockito is also easier to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito

2021-04-05 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-12618:
--

 Summary: Convert LogManager (and other EasyMocks) in 
ReplicaManagerTest to Mockito
 Key: KAFKA-12618
 URL: https://issues.apache.org/jira/browse/KAFKA-12618
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan


[This 
commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d]
 introduced changes that have Partition calling getLog when there is no topic 
ID associated to the Partition. In this case, getLog will use a default 
argument. EasyMock (a Java framework) does not play well with scala's default 
arguments. For now, we are manually creating a partition and associating it in 
the initializeLogAndTopicId method. But a better long term solution is to use 
Mockito which better supports default arguments.

It would be good to convert all EasyMocks over to mockito as well. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10455:
URL: https://github.com/apache/kafka/pull/10455#discussion_r607313601



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -47,7 +47,8 @@ public long backoff(long attempts) {
 }
 double exp = Math.min(attempts, this.expMax);
 double term = initialInterval * Math.pow(multiplier, exp);
-double randomFactor = ThreadLocalRandom.current().nextDouble(1 - 
jitter, 1 + jitter);
+double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 :

Review comment:
   I had to look this constant up :) 
   
   Can we just make it check if the jitter is equal to zero (or maybe `<=` 
zero)? A caller of this method setting jitter to something like 0.5 might be 
surprised that there is no jitter added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607290168



##
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##
@@ -212,4 +214,103 @@ class RaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testClientQuotas(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(1).
+setNumControllerNodes(1).build()).build()
+try {
+  cluster.format()
+  cluster.startup()
+  TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == 
BrokerState.RUNNING,
+"Broker never made it to RUNNING state.")
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava)
+var filter = ClientQuotaFilter.containsOnly(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+def alterThenDescribe(entity: ClientQuotaEntity,
+  quotas: Seq[ClientQuotaAlteration.Op],
+  filter: ClientQuotaFilter,
+  expectCount: Int): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+  val alterResult = admin.alterClientQuotas(Seq(new 
ClientQuotaAlteration(entity, quotas.asJava)).asJava)
+  try {
+alterResult.all().get()
+  } catch {
+case t: Throwable => fail("AlterClientQuotas request failed", t)
+  }
+
+  def describeOrFail(filter: ClientQuotaFilter): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+try {
+  admin.describeClientQuotas(filter).entities().get()
+} catch {
+  case t: Throwable => fail("DescribeClientQuotas request failed", 
t)
+}
+  }
+
+  val (describeResult, ok) = 
TestUtils.computeUntilTrue(describeOrFail(filter)) {
+results => results.getOrDefault(entity, 
java.util.Collections.emptyMap[String, java.lang.Double]()).size() == 
expectCount
+  }
+  assertTrue(ok, "Broker never saw new client quotas")
+  describeResult
+}
+
+var describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), 
filter, 1)
+assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.97),
+  new ClientQuotaAlteration.Op("producer_byte_rate", 1),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", 10001)
+), filter, 3)
+assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+assertEquals(1.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.95),
+  new ClientQuotaAlteration.Op("producer_byte_rate", null),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+), filter, 1)
+assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0)
+
+describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), 
filter, 1)
+assertEquals(.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+
+// Add another quota for a different entity with same user part
+val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", 
"client-id" -> "some-client").asJava)
+filter = ClientQuotaFilter.containsOnly(
+  List(
+ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ClientQuotaFilterComponent.ofEntity("client-id", "some-client"),
+  ).asJava)
+describeResult = alterThenDescribe(entity2,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), 
filter, 1)
+assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6)
+
+// non-strict match
+filter = ClientQuotaFilter.contains(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+val (describeResult2, ok) = 
TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) 
{
+  results => results.size() == 2
+}
+assertTrue(ok, "Broker never saw two client quotas")

Review comment:
   Yea let me clean this up




-- 
This is an automated messag

[GitHub] [kafka] mageshn commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-05 Thread GitBox


mageshn commented on a change in pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#discussion_r607285210



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
##
@@ -61,12 +61,24 @@ public Plugins(Map props) {
 delegatingLoader.initLoaders();
 }
 
+public Plugins(Map props, ClassLoader parent) {

Review comment:
   Is this primarily added for testing purpose?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607281769



##
File path: 
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##
@@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
   return
 }
 
-// Update the cache
-quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, 
quotaRecord.remove)
-
 // Convert the value to an appropriate Option for the quota manager
 val newValue = if (quotaRecord.remove()) {
   None
 } else {
   Some(quotaRecord.value).map(_.toInt)
 }
 connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+
+// Update the cache

Review comment:
   Good question. I could swear there was a review comment suggesting this, 
but I can't seem to find it. I believe I moved this to align it with 
handleUserClientQuota where we update the underlying quota manager and then 
update the cache. I don't have a strong opinion on which thing happens first, 
but they should both probably be the same.
   
   Thinking about it more, we might want to be more defensive when calling to 
the quota managers. If the quota manager cannot be updated, I think we should 
still update the cache since that reflects the true state of the quota 
according to the metadata log. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607278548



##
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
##
@@ -122,6 +122,14 @@ class ClientQuotaCache {
   entityFilters.put(entityType, entityMatch)
 }
 
+// Special case for non-strict empty filter, match everything

Review comment:
   It should short-circuit and return an empty map on 
[L134](https://github.com/apache/kafka/pull/10254/files#diff-99bc678b86ad25d99b32bc192428120d2ff3f3d478159e2c353f4f5346b43718R133-R135)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607274751



##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) {
 (BeforeTestExecutionCallback) context -> {
 KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
 new TestKitNodes.Builder().
-setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+setNumBrokerNodes(clusterConfig.numBrokers()).

Review comment:
   Not sure, it must have auto-formatted when i renamed to 
`setNumBrokerNodes`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown

2021-04-05 Thread GitBox


jsancio commented on a change in pull request #10468:
URL: https://github.com/apache/kafka/pull/10468#discussion_r607271981



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws 
Exception {
 assertFutureThrows(shutdownFuture, TimeoutException.class);
 }
 
+@Test
+public void testLeaderGracefulShutdownOnClose() throws Exception {
+int localId = 0;
+int otherNodeId = 1;
+int lingerMs = 50;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withAppendLingerMs(lingerMs)
+.build();
+
+context.becomeLeader();
+assertEquals(OptionalInt.of(localId), context.currentLeader());
+assertEquals(1L, context.log.endOffset().offset);
+
+int epoch = context.currentEpoch();
+assertEquals(1L, context.client.scheduleAppend(epoch, 
singletonList("a")));
+
+context.client.poll();
+assertEquals(OptionalLong.of(lingerMs), 
context.messageQueue.lastPollTimeoutMs());
+
+context.time.sleep(20);
+
+// client closed now.
+context.client.close();
+
+// Flag for accepting appends should be toggled to false.
+assertFalse(context.client.canAcceptAppends());
+
+// acceptAppends flag set to false so no writes should be accepted by 
the Leader now.
+assertNull(context.client.scheduleAppend(epoch, singletonList("b")));
+
+// The leader should trigger a flush for whatever batches are present 
in the BatchAccumulator
+assertEquals(2L, context.log.endOffset().offset);
+
+// Now shutdown
+
+// We should still be running until we have had a chance to send 
EndQuorumEpoch
+assertTrue(context.client.isShuttingDown());
+assertTrue(context.client.isRunning());
+
+// Send EndQuorumEpoch request to the other voter
+context.pollUntilRequest();
+assertTrue(context.client.isShuttingDown());
+assertTrue(context.client.isRunning());
+context.assertSentEndQuorumEpochRequest(1, otherNodeId);
+
+// We should still be able to handle vote requests during graceful 
shutdown
+// in order to help the new leader get elected
+context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, 
epoch, 1L));
+context.client.poll();
+context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
+

Review comment:
   Okay, thanks! I have limited time at the moment. I'll try to look at it 
this week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607270847



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
##
@@ -170,8 +170,10 @@ private void alterClientQuotaEntity(
 }
 });
 
-outputRecords.addAll(newRecords);
-outputResults.put(entity, ApiError.NONE);
+// Only add the records to outputRecords if there were no errors

Review comment:
   This is a good fix.
   
   However, I think it would be better just to return immediately after setting 
the error, rather than waiting until the end of the function.  That's 
consistent with how we handle errors at the top of this function, and in other 
manager classes.  It makes it clear that only one error can be set for each 
entity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #10445: KAFKA-12548; Propagate record error messages to application

2021-04-05 Thread GitBox


hachikuji merged pull request #10445:
URL: https://github.com/apache/kafka/pull/10445


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607267726



##
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##
@@ -212,4 +214,103 @@ class RaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testClientQuotas(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(1).
+setNumControllerNodes(1).build()).build()
+try {
+  cluster.format()
+  cluster.startup()
+  TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == 
BrokerState.RUNNING,
+"Broker never made it to RUNNING state.")
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava)
+var filter = ClientQuotaFilter.containsOnly(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+def alterThenDescribe(entity: ClientQuotaEntity,
+  quotas: Seq[ClientQuotaAlteration.Op],
+  filter: ClientQuotaFilter,
+  expectCount: Int): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+  val alterResult = admin.alterClientQuotas(Seq(new 
ClientQuotaAlteration(entity, quotas.asJava)).asJava)
+  try {
+alterResult.all().get()
+  } catch {
+case t: Throwable => fail("AlterClientQuotas request failed", t)
+  }
+
+  def describeOrFail(filter: ClientQuotaFilter): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+try {
+  admin.describeClientQuotas(filter).entities().get()
+} catch {
+  case t: Throwable => fail("DescribeClientQuotas request failed", 
t)
+}
+  }
+
+  val (describeResult, ok) = 
TestUtils.computeUntilTrue(describeOrFail(filter)) {
+results => results.getOrDefault(entity, 
java.util.Collections.emptyMap[String, java.lang.Double]()).size() == 
expectCount
+  }
+  assertTrue(ok, "Broker never saw new client quotas")
+  describeResult
+}
+
+var describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), 
filter, 1)
+assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.97),
+  new ClientQuotaAlteration.Op("producer_byte_rate", 1),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", 10001)
+), filter, 3)
+assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+assertEquals(1.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.95),
+  new ClientQuotaAlteration.Op("producer_byte_rate", null),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+), filter, 1)
+assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0)
+
+describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), 
filter, 1)
+assertEquals(.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+
+// Add another quota for a different entity with same user part
+val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", 
"client-id" -> "some-client").asJava)
+filter = ClientQuotaFilter.containsOnly(
+  List(
+ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ClientQuotaFilterComponent.ofEntity("client-id", "some-client"),
+  ).asJava)
+describeResult = alterThenDescribe(entity2,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), 
filter, 1)
+assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6)
+
+// non-strict match
+filter = ClientQuotaFilter.contains(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+val (describeResult2, ok) = 
TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) 
{
+  results => results.size() == 2
+}
+assertTrue(ok, "Broker never saw two client quotas")

Review comment:
   The "ok" check feels a little clunky here.  Plus if this fa

[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162



##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) {
 (BeforeTestExecutionCallback) context -> {
 KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
 new TestKitNodes.Builder().
-setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+setNumBrokerNodes(clusterConfig.numBrokers()).

Review comment:
   indentation seems a little weird here.  why would this be indented more 
than the following line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162



##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) {
 (BeforeTestExecutionCallback) context -> {
 KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
 new TestKitNodes.Builder().
-setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+setNumBrokerNodes(clusterConfig.numBrokers()).

Review comment:
   indentation seems a little weird here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607263869



##
File path: 
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##
@@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
   return
 }
 
-// Update the cache
-quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, 
quotaRecord.remove)
-
 // Convert the value to an appropriate Option for the quota manager
 val newValue = if (quotaRecord.remove()) {
   None
 } else {
   Some(quotaRecord.value).map(_.toInt)
 }
 connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+
+// Update the cache

Review comment:
   What's the purpose of moving this code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12548) Invalid record error message is not getting sent to application

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12548.
-
Resolution: Fixed

> Invalid record error message is not getting sent to application
> ---
>
> Key: KAFKA-12548
> URL: https://issues.apache.org/jira/browse/KAFKA-12548
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> The ProduceResponse includes a nice record error message when we return 
> INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the 
> user never gets a chance to see it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12548) Invalid record error message is not getting sent to application

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12548:

Fix Version/s: 3.0.0

> Invalid record error message is not getting sent to application
> ---
>
> Key: KAFKA-12548
> URL: https://issues.apache.org/jira/browse/KAFKA-12548
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> The ProduceResponse includes a nice record error message when we return 
> INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the 
> user never gets a chance to see it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607263189



##
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
##
@@ -122,6 +122,14 @@ class ClientQuotaCache {
   entityFilters.put(entityType, entityMatch)
 }
 
+// Special case for non-strict empty filter, match everything

Review comment:
   What happens if we have an empty filter list and strict mode set?  It 
looks like an exception is thrown-- that doesn't seem correct?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


junrao commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r606343187



##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be

Review comment:
   indicate => indicates Ditto in a few other places.

##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ *

[GitHub] [kafka] cmccabe commented on pull request #10366: KAFKA-12467: Add controller-side snapshot generation

2021-04-05 Thread GitBox


cmccabe commented on pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#issuecomment-813567129


   Fix spotbugs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r607232953



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -355,24 +373,29 @@ private void fireHandleResign(int epoch) {
 }
 
 @Override
-public void initialize() throws IOException {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+public void initialize() {
+try {
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
-&& quorum.remoteVoters().isEmpty()
-&& !quorum.isLeader()
-&& !quorum.isCandidate()) {
-transitionToCandidate(currentTimeMs);
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
+&& quorum.remoteVoters().isEmpty()
+&& !quorum.isLeader()

Review comment:
   Since we're in here already, this check is not needed since we already 
ruled it out above.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Looks like we are trying to do snapshots every 10 records. We could 
probably get rid of `lastSnapshotEndOffset` and use `committed % 10` or 
something like that. It may also be useful to be able to control the frequency 
of snapshots with a parameter.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -326,6 +336,14 @@ private void updateListenersProgress(List 
listenerContexts, lon
 }
 }
 
+private Optional> latestSnapshot() {
+return log.latestSnapshotId().flatMap(snapshoId -> {

Review comment:
   nit: missing t in snapshotId

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long 
currentTimeMs) {
 return false;
 }
 
-private void maybeUpdateOldestSnapshotId() {
-log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot);
+private void maybeUpdateEarliestSnapshotId() {

Review comment:
   Hmm, I guess I still see this the other way around. Why would the raft 
client care about updating the log start offset if not to delete old snapshots? 
What would that even mean outside the context of snapshot deletion?

##
File p

[GitHub] [kafka] ijuma commented on a change in pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity

2021-04-05 Thread GitBox


ijuma commented on a change in pull request #10393:
URL: https://github.com/apache/kafka/pull/10393#discussion_r607233215



##
File path: raft/src/main/java/org/apache/kafka/raft/CandidateState.java
##
@@ -235,6 +240,15 @@ public int epoch() {
 return highWatermark;
 }
 
+@Override
+public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+// Still reject vote request even candidateId = localId, Although the 
candidate votes for
+// itself, this vote is implicit and not "granted".
+log.debug("Rejecting vote request from candidate {} since we are 
already candidate in epoch {}",
+candidateId, epoch);

Review comment:
   As a general rule, methods like this should not log IMO. The calling 
method should log instead. That is, good to avoid the side effect from the 
"check" operation. It seems like it was like that before this PR. What was the 
motivation for changing it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ncliang commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-05 Thread GitBox


ncliang commented on a change in pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#discussion_r607232615



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
##
@@ -342,6 +343,47 @@ public void 
newPluginsShouldConfigureWithPluginClassLoader() {
 assertPluginClassLoaderAlwaysActive(samples);
 }
 
+@Test
+public void pluginClassLoaderReadVersionFromResource() {
+TestPlugins.assertAvailable();
+
+Map pluginProps = new HashMap<>();
+pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG,
+TestPlugins.pluginPath().stream()
+.filter(s -> 
s.contains("read-version-from-resource-v1"))
+.collect(Collectors.joining()));
+plugins = new Plugins(pluginProps);
+
+Converter converter = plugins.newPlugin(
+TestPlugins.READ_VERSION_FROM_RESOURCE,
+new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+Converter.class
+);
+assertEquals("1.0.0",
+new String(converter.fromConnectData(null, null, null)));
+PluginClassLoader pluginClassLoader = plugins.delegatingLoader()
+.pluginClassLoader(TestPlugins.READ_VERSION_FROM_RESOURCE);
+assertNotNull(pluginClassLoader);
+
+
+// Re-initialize Plugins object with plugin class loader in the class 
loader tree. This is
+// to simulate the situation where jars exist on both system classpath 
and plugin path.
+pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG,
+TestPlugins.pluginPath().stream()
+.filter(s -> 
s.contains("read-version-from-resource-v2"))
+.collect(Collectors.joining()));
+plugins = new Plugins(pluginProps, pluginClassLoader);

Review comment:
   I agree that having the additional testcases would be valuable. However, 
I do think that adding the resource directly to the app classloader makes the 
test much less readable. I like the idea of constructing and using a plain 
`URLClassLoader` as the parent. While it still requires the additional 
constructor to be exposed on `Plugins`, we do already expose the parent 
parameter on `DelegatingClassLoader` for specifically the flexibility of 
controlling the parent loader.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315000#comment-17315000
 ] 

John Roesler commented on KAFKA-12602:
--

Thank you, [~jmclean] ,

 

There weren't too many BSD or MIT licenses, so I've gone ahead and just copied 
the exact license files from each one of those dependencies. Thanks for the 
feedback.

I also added notices regarding the provenance and copyright of PureJavaCrc32C 
and Murmur3 to our NOTICE file.

I will create two follow-up tickets, one to revisit the NOTICE file, and 
another to add an automated check to the release script to make sure that the 
our license file doesn't start to rot again in the future.

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r607224516



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
 .setTopics(topicsToCreate)
 )
 
-channelManager.get.sendRequest(createTopicsRequest, new 
ControllerRequestCompletionHandler {
+val requestCompletionHandler = new ControllerRequestCompletionHandler {
   override def onTimeout(): Unit = {
 debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
 clearInflightRequests(creatableTopics)
   }
 
   override def onComplete(response: ClientResponse): Unit = {
-debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
+debug(s"Auto topic creation completed for ${creatableTopics.keys} with 
response ${response.responseBody.toString}.")
 clearInflightRequests(creatableTopics)
   }
-})
+}
+
+val channelManager = this.channelManager.getOrElse {
+  throw new IllegalStateException("Channel manager must be defined in 
order to send CreateTopic requests.")
+}
+
+val request = metadataRequestContext.map { context =>
+  val requestVersion =
+channelManager.controllerApiVersions() match {
+  case None =>
+// We will rely on the Metadata request to be retried in the case
+// that the latest version is not usable by the controller.
+ApiKeys.CREATE_TOPICS.latestVersion()
+  case Some(nodeApiVersions) =>
+nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+}
+
+  // Borrow client information such as client id and correlation id from 
the original request,
+  // in order to correlate the create request with the original metadata 
request.
+  val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+requestVersion,
+context.clientId,
+context.correlationId)
+  ForwardingManager.buildEnvelopeRequest(context,
+
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))

Review comment:
   @dengziming It's not a bad idea. We could even simplify it a little 
since the api key and version can be obtained from the request. I tend to agree 
that this is kind of a niche usage though, so I'm not sure it calls for the 
generality. Perhaps you could submit a follow-up once this is merged and we can 
see what it looks like.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >