[GitHub] [kafka] dajac commented on a change in pull request #9903: KAFKA-12204; Implement DescribeCluster API in the broker

2021-01-21 Thread GitBox


dajac commented on a change in pull request #9903:
URL: https://github.com/apache/kafka/pull/9903#discussion_r562449399



##
File path: 
clients/src/main/resources/common/message/DescribeClusterResponse.json
##
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 60,
+  "type": "response",
+  "name": "DescribeClusterResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top-level error code, or 0 if there was no error" },
+{ "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The top-level error message, or null if there was no error." },
+{ "name": "ClusterId", "type": "string", "nullableVersions": "0+", 
"versions": "0+", "default": "null",

Review comment:
   `ClusterId` is always filled in the `DescribeClusterResponse` so it does 
not have to be `nullable`. In the `MetadataResponse` case, `ClusterId` is not 
supported by all the versions so having `null` as default helps to 
differentiate the cases where it is provided from the cases where it is not.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9925: MINOR: Tag `RaftEventSimulationTest` as `integration` and tweak it

2021-01-21 Thread GitBox


chia7712 commented on a change in pull request #9925:
URL: https://github.com/apache/kafka/pull/9925#discussion_r562445257



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -59,6 +60,7 @@
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;

Review comment:
   Is ```assumeTrue``` used in this test? I test 
```RaftEventSimulationTest`` but there is no ignored test cases.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] inponomarev commented on pull request #9946: KAFKA-12230: do not interrupt execution on failure to `setPosixFilePermissions` on Windows machines

2021-01-21 Thread GitBox


inponomarev commented on pull request #9946:
URL: https://github.com/apache/kafka/pull/9946#issuecomment-765170729


   Duplicate of #9947



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] inponomarev closed pull request #9946: KAFKA-12230: do not interrupt execution on failure to `setPosixFilePermissions` on Windows machines

2021-01-21 Thread GitBox


inponomarev closed pull request #9946:
URL: https://github.com/apache/kafka/pull/9946


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-21 Thread GitBox


vvcephei commented on pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#issuecomment-764973767


   Huh, I can't get the PlaintextConsumerTest to fail locally...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-21 Thread GitBox


abbccdda commented on a change in pull request #9934:
URL: https://github.com/apache/kafka/pull/9934#discussion_r562048902



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##
@@ -116,8 +116,8 @@
 /** indicates the minimum required inter broker magic required to support 
the API */
 public final byte minRequiredInterBrokerMagic;
 
-/** indicates whether the API is enabled and should be exposed in 
ApiVersions **/
-public final boolean isEnabled;
+/** indicates whether this is an API which is only exposed by the KIP-500 
controller **/
+public final boolean isControllerOnlyApi;

Review comment:
   Could we get a JIRA to track the work for refactoring these boolean 
flags into static collections of api keys? To me it is not easy to use 
constructor correctly when there are multiple of them. 

##
File path: raft/README.md
##
@@ -12,17 +12,14 @@ Below we describe the details to set this up.
 bin/test-raft-server-start.sh config/raft.properties
 
 ### Run Multi Node Quorum ###
-Create 3 separate raft quorum properties as the following

Review comment:
   Do we need to define `process.roles` here?

##
File path: core/src/main/scala/kafka/Kafka.scala
##
@@ -65,11 +65,12 @@ object Kafka extends Logging {
 
   private def buildServer(props: Properties): Server = {
 val config = KafkaConfig.fromProps(props, false)
-if (config.processRoles.isEmpty) {
+if (config.requiresZookeeper) {

Review comment:
   What if we call `processRoles.isDefined` here?

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1876,5 +1874,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs 
should always be less than" +
 s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to 
prevent failed" +
 s" authentication responses from timing out")
+
+if (requiresZookeeper && zkConnect == null) {

Review comment:
   Do we have test coverage for this check?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9945: KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields

2021-01-21 Thread GitBox


chia7712 commented on a change in pull request #9945:
URL: https://github.com/apache/kafka/pull/9945#discussion_r561947369



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
 var clusterAuthorizedOperations = Int.MinValue

Review comment:
   the default value of ```clusterAuthorizedOperations``` in auto-generated 
protocol is ```-2147483648```. If the version of request is bigger than 10, 
does it cause error if we set ```Int.MinValue``` to 
```clusterAuthorizedOperations``` of ```MetadataResponseData```?

##
File path: clients/src/main/resources/common/message/MetadataRequest.json
##
@@ -31,7 +31,10 @@
 // Starting in version 8, authorized operations can be requested for 
cluster and topic resource.
 //
 // Version 9 is the first flexible version.
-// Version 10 add topicId
+//
+// Version 10 adds topicId.
+//
+// Version 11 deprecates IncludeClusterAuthorizedOperations field 
(KIP-700).

Review comment:
   How about saying this function is migrated to 
```DescribeClusterRequest```?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
 var clusterAuthorizedOperations = Int.MinValue

Review comment:
   Could you add the comments for that value (```Int.MinValue```) for dumb 
readers like me :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2021-01-21 Thread GitBox


guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387


   > Weird -- these changes seem to be causing the `SaslXConsumerTest` family 
of tests to hang. I'm not very (or at all) familiar with these tests so I 
haven't found anything yet but I'm actively looking into it
   
   Hmm... I'm not familiar with SaslXConsumerTest either...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest

2021-01-21 Thread GitBox


dajac commented on pull request #9938:
URL: https://github.com/apache/kafka/pull/9938#issuecomment-764468637







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-21 Thread GitBox


chia7712 commented on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-764247507


   @tang7526 thanks for your patch. Could you fix other docs error also?
   
   ```
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:42:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24:
 warning: Could not find any member to link for 
"org.apache.kafka.streams.kstream.Joined".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:37:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:24:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:68:
 warning: Could not find any member to link for "Topology.AutoOffsetReset".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:55:
 warning: Could not find any member to link for "TimestampExtractor".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:45:
 warning: Could not find any member to link for "Serde".
 /**
 ^
   
/home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala:157:
 warning: Could not find any member to link for 
"org.apache.kafka.streams.errors.TopologyException".
 /**
 ^
   44 warnings
   
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2021-01-21 Thread GitBox


mjsax commented on pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#issuecomment-764961250


   @wcarlson5 Seems we forgot the update the docs for this KIP. Can you do a 
follow up PR? This follow up PR should also cover the changes of 
https://github.com/apache/kafka/pull/9487 that is part if the same KIP.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9947: KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems

2021-01-21 Thread GitBox


showuon commented on a change in pull request #9947:
URL: https://github.com/apache/kafka/pull/9947#discussion_r562312538



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##
@@ -115,22 +115,29 @@ public void shouldCreateBaseDirectory() {
 
 @Test
 public void shouldHaveSecurePermissions() {
-final Set expectedPermissions = EnumSet.of(
-PosixFilePermission.OWNER_EXECUTE,
-PosixFilePermission.GROUP_READ,
-PosixFilePermission.OWNER_WRITE,
-PosixFilePermission.GROUP_EXECUTE,
-PosixFilePermission.OWNER_READ);
-
-final Path statePath = Paths.get(stateDir.getPath());
-final Path basePath = Paths.get(appDir.getPath());
-try {
-final Set baseFilePermissions = 
Files.getPosixFilePermissions(statePath);
-final Set appFilePermissions = 
Files.getPosixFilePermissions(basePath);
-assertThat(expectedPermissions, equalTo(baseFilePermissions));
-assertThat(expectedPermissions, equalTo(appFilePermissions));
-} catch (final IOException e) {
-fail("Should create correct files and set correct permissions");
+assertPermissions(Paths.get(stateDir.getPath()));
+assertPermissions(Paths.get(appDir.getPath()));
+}
+
+private void assertPermissions(final Path path) {
+if 
(path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
+final Set expectedPermissions = EnumSet.of(
+PosixFilePermission.OWNER_EXECUTE,
+PosixFilePermission.GROUP_READ,
+PosixFilePermission.OWNER_WRITE,
+PosixFilePermission.GROUP_EXECUTE,
+PosixFilePermission.OWNER_READ);
+try {
+final Set baseFilePermissions = 
Files.getPosixFilePermissions(path);

Review comment:
   This path could be `baseFile` and also `appFile`. I think we'd better to 
rename it, ex: `filePermissions` or any better idea?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -109,16 +109,27 @@ public StateDirectory(final StreamsConfig config, final 
Time time, final boolean
 log.warn("Using /tmp directory in the state.dir property can 
cause failures with writing the checkpoint file" +
 " due to the fact that this directory can be cleared by 
the OS");
 }
-
 // change the dir permission to "rwxr-x---" to avoid world readable
-final Path basePath = Paths.get(baseDir.getPath());
-final Path statePath = Paths.get(stateDir.getPath());
+configurePermissions(Paths.get(baseDir.getPath()));
+configurePermissions(Paths.get(stateDir.getPath()));
+}
+}
+
+private void configurePermissions(final Path path) {
+if 
(path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
 final Set perms = 
PosixFilePermissions.fromString("rwxr-x---");
 try {
-Files.setPosixFilePermissions(basePath, perms);
-Files.setPosixFilePermissions(statePath, perms);
+Files.setPosixFilePermissions(path, perms);
 } catch (final IOException e) {
-log.error("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);
+log.error("Error changing permissions for the directory {} ", 
path, e);
+}
+} else {
+final File file = path.toFile();

Review comment:
   We pass the `path` parameter which is converted from `File` instance 
(i.e. `Paths.get(stateDir.getPath()`), and then we converted it back to `File` 
instance in the method. It's redundant. Could we just pass the `File` instance 
into the method directly?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##
@@ -115,22 +115,29 @@ public void shouldCreateBaseDirectory() {
 
 @Test
 public void shouldHaveSecurePermissions() {
-final Set expectedPermissions = EnumSet.of(
-PosixFilePermission.OWNER_EXECUTE,
-PosixFilePermission.GROUP_READ,
-PosixFilePermission.OWNER_WRITE,
-PosixFilePermission.GROUP_EXECUTE,
-PosixFilePermission.OWNER_READ);
-
-final Path statePath = Paths.get(stateDir.getPath());
-final Path basePath = Paths.get(appDir.getPath());
-try {
-final Set baseFilePermissions = 
Files.getPosixFilePermissions(statePath);
-final Set appFilePermissions = 
Files.getPosixFilePermissions(basePath);
-assertThat(expectedPermissions, equalTo(baseFilePermissions));
-assertThat(expectedPermissions, equalTo(appFilePermissions));
-} catch 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-21 Thread GitBox


wcarlson5 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562150019



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -235,25 +235,27 @@ public void shouldReduceWindowed() throws Exception {
 .thenComparing(KeyValueTimestamp::value);
 
 windowedOutput.sort(comparator);
-final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
-final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+final long firstBatchWindowStart = firstBatchTimestamp / 500 * 500;
+final long firstBatchWindowEnd = firstBatchWindowStart + 500;

Review comment:
   Why don't you use `timeDifference` like you did below? 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
##
@@ -33,22 +33,22 @@
  */
 public class TimeWindowedDeserializer implements Deserializer> {
 
-private final Long windowSize;
+private Long windowSize;
 private boolean isChangelogTopic;
 
 private Deserializer inner;
-
+
 // Default constructor needed by Kafka
 public TimeWindowedDeserializer() {
-this(null, Long.MAX_VALUE);
+this(null, null);
 }
 
-// TODO: fix this part as last bits of KAFKA-4468
+@Deprecated
 public TimeWindowedDeserializer(final Deserializer inner) {
 this(inner, Long.MAX_VALUE);

Review comment:
   Do you want to change this from `Long.MAX_VALUE` as well?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -327,7 +329,7 @@ public void shouldAggregateWindowed() throws Exception {
 startStreams();
 
 final List, Integer>> 
windowedMessages = receiveMessagesWithTimestamp(
-new TimeWindowedDeserializer<>(),

Review comment:
   I don't think we should get rid of the generics unless we have to

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
##
@@ -56,4 +57,23 @@ public void testWindowedValueDeserializerNoArgConstructors() 
{
 assertNotNull("Inner deserializer should be not null", inner);
 assertTrue("Inner deserializer type should be ByteArrayDeserializer", 
inner instanceof ByteArrayDeserializer);
 }
+
+@Test
+public void setWindowSizeThroughConfigs() {
+props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
+final TimeWindowedDeserializer deserializer = new 
TimeWindowedDeserializer<>();
+deserializer.configure(props, false);

Review comment:
   Is there some sort of check you can verify here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches

2021-01-21 Thread GitBox


dajac commented on pull request #9832:
URL: https://github.com/apache/kafka/pull/9832#issuecomment-764469562


   Failed tests in the last build are not related to the changes in this PR:
   * Build / JDK 8 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   * Build / JDK 8 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   * Build / JDK 8 / 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete()
   * Build / JDK 11 / 
kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback()
   
   I will go ahead and merge the PR in trunk and 2.7.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #9810: MINOR: Tweak IBM i platform support in "stop" scripts

2021-01-21 Thread GitBox


mimaison merged pull request #9810:
URL: https://github.com/apache/kafka/pull/9810


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot

2021-01-21 Thread GitBox


dengziming commented on a change in pull request #9819:
URL: https://github.com/apache/kafka/pull/9819#discussion_r561742346



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1093,9 +1093,11 @@ public static final void readFully(InputStream 
inputStream, ByteBuffer destinati
 destinationBuffer.position(destinationBuffer.position() + 
totalBytesRead);
 }
 
-public static void writeFully(FileChannel channel, ByteBuffer 
sourceBuffer) throws IOException {
+public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) 
throws IOException {
+int size = 0;

Review comment:
   In fact, this change was brought when using `BaseRegion`.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 if (!snapshotOpt.isPresent()) {
 return FetchSnapshotResponse.singleton(
 log.topicPartition(),
-responsePartitionSnapshot -> {
-return addQuorumLeader(responsePartitionSnapshot)
-.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
-}
+responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code())
 );
 }
 
 try (RawSnapshotReader snapshot = snapshotOpt.get()) {
 if (partitionSnapshot.position() < 0 || 
partitionSnapshot.position() >= snapshot.sizeInBytes()) {
 return FetchSnapshotResponse.singleton(
 log.topicPartition(),
-responsePartitionSnapshot -> {
-return addQuorumLeader(responsePartitionSnapshot)
-.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code());
-}
+responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code())
 );
 }
 
 int maxSnapshotSize;
+int maxSnapshotPosition;
 try {
 maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
 } catch (ArithmeticException e) {
 maxSnapshotSize = Integer.MAX_VALUE;
 }
 
-ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), 
maxSnapshotSize));
-snapshot.read(buffer, partitionSnapshot.position());
-buffer.flip();
+try {
+maxSnapshotPosition = 
Math.toIntExact(partitionSnapshot.position());
+} catch (ArithmeticException e) {
+maxSnapshotPosition = Integer.MAX_VALUE;

Review comment:
   Thank you, done!

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException {
 }
 
 @Override
-public void append(ByteBuffer buffer) throws IOException {
+public void append(BaseRecords records) throws IOException {
 if (frozen) {
 throw new IllegalStateException(
-String.format("Append is not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+String.format("Append is not supported. Snapshot is 
already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
 );
 }
-
+ByteBuffer buffer;
+if (records instanceof MemoryRecords) {
+buffer = ((MemoryRecords) records).buffer();
+} else {
+buffer = ByteBuffer.allocate(records.sizeInBytes());
+((FileRecords) records).channel().read(buffer);
+buffer.flip();
+}

Review comment:
   Done, make parameter type to MemoryRecords.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-21 Thread GitBox


guozhangwang commented on a change in pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#discussion_r561449314



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2041,6 +2052,118 @@ public void testInvalidGroupMetadata() throws 
InterruptedException {
 assertThrows(IllegalStateException.class, consumer::groupMetadata);
 }
 
+@Test
+public void testPollMetadata() {
+final Time time = new MockTime();
+final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+final ConsumerMetadata metadata = createMetadata(subscription);
+final MockClient client = new MockClient(time, metadata);
+
+initMetadata(client, singletonMap(topic, 1));
+final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+final KafkaConsumer consumer =
+newConsumer(time, client, subscription, metadata, assignor, true, 
groupInstanceId);
+
+consumer.assign(singleton(tp0));
+consumer.seek(tp0, 50L);
+
+final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
+client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo)));
+
+final ConsumerRecords records = 
consumer.poll(Duration.ofMillis(1));
+assertEquals(5, records.count());
+assertEquals(55L, consumer.position(tp0));
+
+// verify that the consumer computes the correct metadata based on the 
fetch response
+final ConsumerRecords.Metadata actualMetadata = 
records.metadata().get(tp0);
+assertEquals(100L, (long) actualMetadata.endOffset());
+assertEquals(55L, (long) actualMetadata.position());
+assertEquals(45L, (long) actualMetadata.lag());
+consumer.close(Duration.ZERO);
+}
+
+
+@Test
+public void testPollMetadataWithExtraPartitions() {

Review comment:
   Does the test cover 1) stale epoch, 2) no prev value, cases?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 } else {
 List> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-if (!records.isEmpty()) {
-TopicPartition partition = nextInLineFetch.partition;
-List> currentRecords = 
fetched.get(partition);
-if (currentRecords == null) {
-fetched.put(partition, records);
-} else {
-// this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-// but it might conceivably happen in some rare 
cases (such as partition leader changes).
-// we have to copy to a new list because the old 
one may be immutable
-List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-newRecords.addAll(currentRecords);
-newRecords.addAll(records);
-fetched.put(partition, newRecords);
+TopicPartition partition = nextInLineFetch.partition;
+
+if (subscriptions.isAssigned(partition)) {
+// initializeCompletedFetch, above, has already 
persisted the metadata from the fetch in the
+// SubscriptionState, so we can just read it out, 
which in particular lets us re-use the logic
+// for determining the end offset
+final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+final Long beginningOffset = 
subscriptions.logStartOffset(partition);
+final Long endOffset = 
subscriptions.logEndOffset(partition, isolationLevel);
+final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+if (fetchMetadata == null
+|| 
!fetchMetadata.position().offsetEpoch.isPresent()
+|| fetchPosition.offsetEpoch.isPresent()
+&& fetchMetadata.position().offsetEpoch.get() <= 
fetchPosition.offsetEpoch.get()) {

Review comment:
   Interesting, why we do not want to update the metadata if epoch is stale?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 } else {
 List> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-

[GitHub] [kafka] hachikuji merged pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-21 Thread GitBox


hachikuji merged pull request #9934:
URL: https://github.com/apache/kafka/pull/9934


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-21 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {
+  if (shouldForwardRequest(request)) {
+forwardingManager.sendInterBrokerRequest(
+  getCreateTopicsRequest(Seq(internalTopicName)),
+  _ => ())
+  } else {
+val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+adminManager.createTopics(
+  config.requestTimeoutMs,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
 }
-val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-} else {
-  val coordinatorEndpoint = topicMetadata.partitions.asScala
-.find(_.partitionIndex == partition)
-.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-.flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-.flatMap(_.getNode(request.context.listenerName))
-.filterNot(_.isEmpty)
-
-  coordinatorEndpoint match {
-case Some(endpoint) =>
-  createFindCoordinatorResponse(Errors.NONE, endpoint)
-case _ =>
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+  Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+  } else {
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+   

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-21 Thread GitBox


wcarlson5 commented on a change in pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#discussion_r561409907



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -407,7 +413,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 referenceContainer.nextScheduledRebalanceMs,
 shutdownErrorHook,
 streamsUncaughtExceptionHandler,
-cacheSize -> cache.resize(cacheSize)

Review comment:
   My ide tried to optimize this as well. At the time not passing in 
cacheSize caused some expections. I would be careful about making this change 
without need

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -407,7 +413,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 referenceContainer.nextScheduledRebalanceMs,
 shutdownErrorHook,
 streamsUncaughtExceptionHandler,
-cacheSize -> cache.resize(cacheSize)

Review comment:
   I think it caused a test to fail but not everytime. It also could have 
been fixed since then as changes have been made. If all the tests pass it's 
probably fine





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hongshaoyang edited a comment on pull request #9943: MINOR: Fix typo in Utils.java

2021-01-21 Thread GitBox


hongshaoyang edited a comment on pull request #9943:
URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143


   Ping @guozhangwang 
   
   Original typo was added in 
https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions

2021-01-21 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269923#comment-17269923
 ] 

Ivan Ponomarev commented on KAFKA-12190:


Hi [~mjsax] sure! I searched for the issue on Jira, but didn't find it, that's 
why the duplicated Jira ticket and the PR. Looking forward for the 
[~awilkinson] 's fix!

> Failure on Windows due to an UnsupportedOperationException when 
> StateDirectory sets file permissions
> 
>
> Key: KAFKA-12190
> URL: https://issues.apache.org/jira/browse/KAFKA-12190
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1, 2.7.1
>Reporter: Andy Wilkinson
>Assignee: Andy Wilkinson
>Priority: Critical
>  Labels: bug
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> There appears to be a regression in Kafka 2.6.1 due to [the 
> changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that 
> causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're 
> seeing failures in Spring Boot's CI on Windows such as the following:
> {noformat}
> Caused by: java.lang.UnsupportedOperationException: (No message provided)
> at java.nio.file.Files.setPosixFilePermissions(Files.java:2044)
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115)
> 
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745)
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585)
> at 
> org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
> 
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48)
> 
> at 
> org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382)
> 
> at 
> org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfigurationTests.whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff(KafkaMetricsAutoConfigurationTests.java:92)
> {noformat}
> The same code worked without changes using Kafka 2.6.0.



--
This 

[GitHub] [kafka] dajac commented on pull request #9945: KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields

2021-01-21 Thread GitBox


dajac commented on pull request #9945:
URL: https://github.com/apache/kafka/pull/9945#issuecomment-764726538


   @chia7712 Thanks for your comments. I have updated the PR to address them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest

2021-01-21 Thread GitBox


chia7712 commented on a change in pull request #9938:
URL: https://github.com/apache/kafka/pull/9938#discussion_r561689904



##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -73,11 +114,6 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
 TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
   }
 
-  private def accessControlEntry(userName: String, permissionType: 
AclPermissionType, operation: AclOperation): AccessControlEntry = {
-new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
userName).toString,
-  AclEntry.WildcardHost, operation, permissionType)
-  }
-
   @AfterEach
   override def tearDown(): Unit = {
 if (client != null)

Review comment:
   Sorry for unclear comment. 
   
   My point was ```Utils.closeQuietly``` includes null check so we don't need 
to add same check again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6223) Please delete old releases from mirroring system

2021-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269917#comment-17269917
 ] 

ASF GitHub Bot commented on KAFKA-6223:
---

mimaison commented on pull request #322:
URL: https://github.com/apache/kafka-site/pull/322#issuecomment-764846591


   @mjsax can you take a look? Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Please delete old releases from mirroring system
> 
>
> Key: KAFKA-6223
> URL: https://issues.apache.org/jira/browse/KAFKA-6223
> Project: Kafka
>  Issue Type: Bug
> Environment: https://dist.apache.org/repos/dist/release/kafka/
>Reporter: Sebb
>Assignee: Rajini Sivaram
>Priority: Major
>
> To reduce the load on the ASF mirrors, projects are required to delete old 
> releases [1]
> Please can you remove all non-current releases?
> It's unfair to expect the 3rd party mirrors to carry old releases.
> Note that older releases can still be linked from the download page, but such 
> links should use the archive server at:
> https://archive.apache.org/dist/kafka/
> A suggested process is:
> + Change the download page to use archive.a.o for old releases
> + Delete the corresponding directories from 
> {{https://dist.apache.org/repos/dist/release/kafka/}}
> e.g. {{svn delete https://dist.apache.org/repos/dist/release/kafka/0.8.0}}
> Thanks!
> [1] http://www.apache.org/dev/release.html#when-to-archive



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] g1geordie commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-21 Thread GitBox


g1geordie commented on pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#issuecomment-764776835


   @chia7712  sorry for late reply . 
   I revert to` NotZstd` format  . Can you help me take a look 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-21 Thread GitBox


ableegoldman commented on pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#issuecomment-764036403







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-21 Thread GitBox


mjsax merged pull request #9940:
URL: https://github.com/apache/kafka/pull/9940


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brianwyka removed a comment on pull request #9584: [KAFKA-10708]: Add "group-id" Tag to Kafka Consumer Metrics

2021-01-21 Thread GitBox


brianwyka removed a comment on pull request #9584:
URL: https://github.com/apache/kafka/pull/9584#issuecomment-725016649


   I couldn't find any tests that are currently testing out the `client-id` tag 
for reference.  Would appreciate a point in the right direction.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562089223



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -52,15 +58,20 @@
  * (i.e., it increases or stays the same over time).
  */
 public class PartitionGroup {
+private static final Logger LOG = 
LoggerFactory.getLogger(PartitionGroup.class);

Review comment:
   I can pass in the log context. I wouldn't pass the actual logger, 
though, because it would mess up common log4j usage patterns.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -78,15 +89,149 @@ RecordQueue queue() {
 }
 }
 
-PartitionGroup(final Map partitionQueues, 
final Sensor recordLatenessSensor) {
+PartitionGroup(final TaskId id,
+   final Map partitionQueues,
+   final Sensor recordLatenessSensor,
+   final Sensor enforcedProcessingSensor,
+   final long maxTaskIdleMs) {
+this.id = id;
 nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
 this.partitionQueues = partitionQueues;
+this.enforcedProcessingSensor = enforcedProcessingSensor;
+this.maxTaskIdleMs = maxTaskIdleMs;
 this.recordLatenessSensor = recordLatenessSensor;
 totalBuffered = 0;
 allBuffered = false;
 streamTime = RecordQueue.UNKNOWN;
 }
 
+public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {
+final Long lag = metadata.lag();
+if (lag != null) {
+LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag);
+fetchedLags.put(partition, lag);
+}
+}
+
+public boolean readyToProcess(final long wallClockTime) {
+if (LOG.isTraceEnabled()) {
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+LOG.trace(
+"[{}] buffered/lag {}: {}/{}",
+id,
+entry.getKey(),
+entry.getValue().size(),
+fetchedLags.get(entry.getKey())
+);
+}
+}
+// Log-level strategy:
+//  TRACE for messages that don't wait for fetches, since these may be 
logged at extremely high frequency
+//  DEBUG when we waited for a fetch and decided to wait some more, as 
configured
+//  DEBUG when we are ready for processing and didn't have to enforce 
processing
+//  INFO  when we enforce processing, since this has to wait for 
fetches AND may result in disorder
+
+if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+final Set bufferedPartitions = new HashSet<>();
+final Set emptyPartitions = new HashSet<>();
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+if (entry.getValue().isEmpty()) {
+emptyPartitions.add(entry.getKey());
+} else {
+bufferedPartitions.add(entry.getKey());
+}
+}
+LOG.trace("[{}] Ready for processing because max.task.idle.ms 
is disabled." +
+  "\n\tThere may be out-of-order processing for 
this task as a result." +
+  "\n\tBuffered partitions: {}" +
+  "\n\tNon-buffered partitions: {}",
+  id,
+  bufferedPartitions,
+  emptyPartitions);
+}
+return true;

Review comment:
   Glad we agree ;) 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -910,6 +900,11 @@ public void addRecords(final TopicPartition partition, 
final Iterable

[GitHub] [kafka] dajac commented on a change in pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest

2021-01-21 Thread GitBox


dajac commented on a change in pull request #9938:
URL: https://github.com/apache/kafka/pull/9938#discussion_r561680234



##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import scala.jdk.CollectionConverters._
 
+object DescribeAuthorizedOperationsTest {
+  val Group1 = "group1"
+  val Group2 = "group2"
+  val Group3 = "group3"
+  val Topic1 = "topic1"
+  val Topic2 = "topic2"
+
+  val Group1Acl = new AclBinding(
+new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
ALL, ALLOW))
+
+  val Group2Acl = new AclBinding(
+new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
DESCRIBE, ALLOW))
+
+  val Group3Acl = new AclBinding(
+new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
DELETE, ALLOW))
+
+  val ClusterAllAcl = new AclBinding(
+new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
ALL, ALLOW))
+
+  val Topic1Acl = new AclBinding(
+new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
ALL, ALLOW))
+
+  val Topic2All = new AclBinding(
+new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL),
+accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, 
DELETE, ALLOW))
+
+  private def accessControlEntry(
+userName: String,
+operation: AclOperation,
+permissionType: AclPermissionType

Review comment:
   Indeed, let me remove that field.

##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import scala.jdk.CollectionConverters._
 
+object DescribeAuthorizedOperationsTest {
+  val Group1 = "group1"

Review comment:
   In Scala, we tend to only capitalize the first letter of constants. At 
least, this how I have been doing it so far. I prefer to keep it as is.

##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -73,11 +114,6 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
 TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
   }
 
-  private def accessControlEntry(userName: String, permissionType: 
AclPermissionType, operation: AclOperation): AccessControlEntry = {
-new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
userName).toString,
-  AclEntry.WildcardHost, operation, permissionType)
-  }
-
   @AfterEach
   override def tearDown(): Unit = {
 if (client != null)

Review comment:
   I think that it is better to keep it as `client` could not be 
initialized. For instance, let's say that there is an exception raised when the 
client is constructed. In this case, `client` would remain `null`.

##
File path: 
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
##
@@ -73,11 +114,6 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
 TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
   }
 
-  private def accessControlEntry(userName: String, permissionType: 
AclPermissionType, operation: AclOperation): AccessControlEntry = {
-new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
userName).toString,
-  AclEntry.WildcardHost, operation, permissionType)
-  }
-
   @AfterEach
   override def tearDown(): Unit = {
 if (client != null)

Review comment:
   Got it, thanks for the clarification. I have removed the null check.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-21 Thread GitBox


g1geordie commented on a change in pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#discussion_r562087677



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
##
@@ -423,30 +455,32 @@ public void buildUsingCreateTime(Args args) {
 }
 
 @ParameterizedTest
-@RecordBuilderSource
+@ArgumentsSource(NotZstd.class)
 public void testAppendedChecksumConsistency(Args args) {

Review comment:
   also test it 

##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
##
@@ -27,112 +27,81 @@
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
-import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.support.AnnotationConsumer;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import java.util.function.BiPredicate;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.Arrays.asList;
-import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
-import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
-import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
-import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
 import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class MemoryRecordsBuilderTest {
 
 private static class Args {
 final int bufferOffset;
-final byte magic;
 final CompressionType compressionType;
 
-public Args(int bufferOffset, byte magic, CompressionType 
compressionType) {
+public Args(int bufferOffset, CompressionType compressionType) {
 this.bufferOffset = bufferOffset;
-this.magic = magic;
 this.compressionType = compressionType;
 }
 
 @Override
 public String toString() {
-return "Args{" +
-"bufferOffset=" + bufferOffset +
-", magic=" + magic +
-", compressionType=" + compressionType +
-'}';
+return "bufferOffset=" + bufferOffset +
+", compressionType=" + compressionType;
 }
 }
 
-private static Stream allArguments(BiPredicate accept) {
+private static Stream allArguments(Predicate 
accept) {
 List values = new ArrayList<>();
 for (int bufferOffset : Arrays.asList(0, 15))
-for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2))
-for (CompressionType compressionType : 
CompressionType.values())
-if (accept.test(magic, compressionType))
-values.add(Arguments.of(new Args(bufferOffset, magic, 
compressionType)));
+for (CompressionType compressionType : CompressionType.values())
+if (accept.test(compressionType))
+values.add(Arguments.of(new Args(bufferOffset, 
compressionType)));
 return values.stream();
 }
 
-private static class MemoryRecordsBuilderArgumentsProvider implements 
ArgumentsProvider, AnnotationConsumer {
-
-private RecordBuilderSource recordSource;
-private BiPredicate validCompress = (magic, 
type) -> type != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2;
-
+private static class MemoryRecordsBuilderArgumentsProvider implements 
ArgumentsProvider {
 @Override
 public Stream provideArguments(ExtensionContext 
context) {
-Predicate predicate = magic -> recordSource.minMagic() <= 
magic && magic <= recordSource.maxMagic();
-return allArguments((magic, type) -> 
(recordSource.haveInvalidCompress() || validCompress.test(magic, type)) && 
predicate.test(magic));
+return allArguments(type -> true);
 }
+}
 
+private static class NotZstd implements ArgumentsProvider {

Review comment:
   have added

##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
##
@@ -72,19 

[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2021-01-21 Thread Mehran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269901#comment-17269901
 ] 

Mehran commented on KAFKA-7077:
---

Any update?

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

2021-01-21 Thread GitBox


mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562200891



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -978,7 +971,7 @@ private void waitForRunning(final 
List !observed.isEmpty() && observed.get(observed.size() - 
1).value.equals(State.RUNNING),
 MAX_WAIT_TIME_MS,
-() -> "Client did not startup on time. Observers transitions: " + 
observed
+() -> "Client did not have the expected state transition on time. 
Observers transitions: " + observed

Review comment:
   Why this change? We do wait for `RUNNING`?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -993,6 +986,17 @@ private void waitForStateTransition(final 
List> observed,
+final 
List> expected)
+throws Exception {
+
+waitForCondition(
+() -> observed.containsAll(expected),
+MAX_WAIT_TIME_MS,
+() -> "Client did not have the expected state transition on time. 
Observers transitions: " + observed

Review comment:
   Can we add the expected transitions, too? Easier to debug if the test 
fails.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final 
String stateDirPath,
 }
 
 @Test
-@Deprecated
 public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() 
throws Exception {
+final long time = System.currentTimeMillis();

Review comment:
   The PR you liked seems to be unrelated to this test.
   
   Still wondering if we should extract this change to a dedicated PR and 
cherry-pick to older branches? -- Or do we have a good explanation why older 
branches would not be affected?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -993,6 +986,17 @@ private void waitForStateTransition(final 
List> observed,
+final 
List> expected)

Review comment:
   nit: fix indention





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9942: KAFKA-12229: reset to original class loader after connector stop

2021-01-21 Thread GitBox


showuon commented on pull request #9942:
URL: https://github.com/apache/kafka/pull/9942#issuecomment-764189012







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

2021-01-21 Thread GitBox


lct45 commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-764993225


   @mjsax Ahh yeah I'll submit a PR, thanks for catching that



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9902: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-01-21 Thread GitBox


mimaison commented on a change in pull request #9902:
URL: https://github.com/apache/kafka/pull/9902#discussion_r562011024



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -183,6 +186,10 @@ public void disconnected(String id, long now) {
 connectingNodes.remove(id);
 } else {
 resetConnectionSetupTimeout(nodeState);
+if (nodeState.state.isConnected()) {
+// If a connection had previously been established, re-resolve 
DNS because the IPs may have changed
+nodeState.addresses = Collections.emptyList();

Review comment:
   Slightly weird that we're updating a "private" field here.
   
   Also the comment is a bit misleading. We're not re-resolving DNS here but 
instead clearing state so if we reconnect later, the client will be forced to 
re-resolve then.

##
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##
@@ -907,6 +924,140 @@ public void testCorrelationId() {
 ids.forEach(id -> assertTrue(id < 
SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID));
 }
 
+@Test
+public void testReconnectAfterAddressChange() {
+AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+AtomicInteger initialAddressConns = new AtomicInteger();
+AtomicInteger newAddressConns = new AtomicInteger();
+MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+InetAddress inetAddress = inetSocketAddress.getAddress();
+if (initialAddresses.contains(inetAddress)) {
+initialAddressConns.incrementAndGet();
+} else if (newAddresses.contains(inetAddress)) {
+newAddressConns.incrementAndGet();
+}
+return (mockHostResolver.getUseNewAddresses() && 
newAddresses.contains(inetAddress)) ||

Review comment:
   I was slightly confused until I realized `getUseNewAddresses()` is a 
boolean. Maybe `useNewAddresses()` would be a better name?

##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -239,24 +241,63 @@ public NetworkClient(Selectable selector,
  logContext);
 }
 
-private NetworkClient(MetadataUpdater metadataUpdater,
-  Metadata metadata,
-  Selectable selector,
-  String clientId,
-  int maxInFlightRequestsPerConnection,
-  long reconnectBackoffMs,
-  long reconnectBackoffMax,
-  int socketSendBuffer,
-  int socketReceiveBuffer,
-  int defaultRequestTimeoutMs,
-  long connectionSetupTimeoutMs,
-  long connectionSetupTimeoutMaxMs,
-  ClientDnsLookup clientDnsLookup,
-  Time time,
-  boolean discoverBrokerVersions,
-  ApiVersions apiVersions,
-  Sensor throttleTimeSensor,
-  LogContext logContext) {
+public NetworkClient(MetadataUpdater metadataUpdater,

Review comment:
   Do we really need this constructor? As far as I can tell, it's only 
called by the other 3 above. These could directly call the real one below 
instead of going through this new one. WDYT?

##
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##
@@ -907,6 +924,140 @@ public void testCorrelationId() {
 ids.forEach(id -> assertTrue(id < 
SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID));
 }
 
+@Test
+public void testReconnectAfterAddressChange() {
+AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+AtomicInteger initialAddressConns = new AtomicInteger();
+AtomicInteger newAddressConns = new AtomicInteger();
+MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+InetAddress inetAddress = inetSocketAddress.getAddress();
+if (initialAddresses.contains(inetAddress)) {
+initialAddressConns.incrementAndGet();
+} else if (newAddresses.contains(inetAddress)) {
+newAddressConns.incrementAndGet();
+}
+return (mockHostResolver.getUseNewAddresses() && 
newAddresses.contains(inetAddress)) ||
+   (!mockHostResolver.getUseNewAddresses() && 
initialAddresses.contains(inetAddress));
+});
+NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
+reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
+defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 

[GitHub] [kafka] ijuma commented on pull request #9878: KAFKA-6987: Reimplement KafkaFuture using CompletableFuture

2021-01-21 Thread GitBox


ijuma commented on pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#issuecomment-764654281







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

2021-01-21 Thread GitBox


cadonna commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r561734967



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
 metrics.close();
 if (!error) {
 setState(State.NOT_RUNNING);
+} else {
+setState(State.ERROR);
 }
 }, "kafka-streams-close-thread");
 }
 
 private boolean close(final long timeoutMs) {
+if (state == State.ERROR) {
+log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+return false;

Review comment:
   As far as I can see, where the return value is used the javadoc says
   
   ```
   true if all threads were successfully stopped, false if the timeout was 
reached.
   ```
   
   Since all threads were successfully stopped, I would return `true`. We 
clearly document that `ERROR` is a terminal state, so I do not see why somebody 
should wait for `NOT_RUNNING` when the client is in `ERROR` and `close()` 
returns `true`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot

2021-01-21 Thread GitBox


jsancio commented on a change in pull request #9819:
URL: https://github.com/apache/kafka/pull/9819#discussion_r561427699



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException {
 }
 
 @Override
-public void append(ByteBuffer buffer) throws IOException {
+public void append(BaseRecords records) throws IOException {
 if (frozen) {
 throw new IllegalStateException(
-String.format("Append is not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+String.format("Append is not supported. Snapshot is 
already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
 );
 }
-
+ByteBuffer buffer;
+if (records instanceof MemoryRecords) {
+buffer = ((MemoryRecords) records).buffer();
+} else {
+buffer = ByteBuffer.allocate(records.sizeInBytes());
+((FileRecords) records).channel().read(buffer);
+buffer.flip();
+}

Review comment:
   > I change the signature to keep consistent with FileRawSnapshotReader
   
   Okay. I think this is something that I struggled with when creating the 
original APIs. I am okay with "inconsistent" APIs since 
`RawSnapshot{Reader,Writer}` are internal interfaces to the raft client and are 
not exposed to the state machine (controller).
   
   I think this "inconsistency" will go away when we implement the long term 
solution.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-21 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r562056000



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
##
@@ -61,6 +61,21 @@ public Long getWindowSize() {
 @SuppressWarnings("unchecked")
 @Override
 public void configure(final Map configs, final boolean isKey) {
+//check if the config is set and the window size is already set from 
the constructor
+final Long configWindowSize;
+if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof 
String) {

Review comment:
   The console consumer made this check necessary - if there's a simpler 
way to do this lmk

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
##
@@ -33,22 +33,22 @@
  */
 public class TimeWindowedDeserializer implements Deserializer> {
 
-private final Long windowSize;
+private Long windowSize;
 private boolean isChangelogTopic;
 
 private Deserializer inner;
-
+
 // Default constructor needed by Kafka
 public TimeWindowedDeserializer() {
-this(null, Long.MAX_VALUE);
+this(null, null);
 }
 
-// TODO: fix this part as last bits of KAFKA-4468
+@Deprecated
 public TimeWindowedDeserializer(final Deserializer inner) {
 this(inner, Long.MAX_VALUE);

Review comment:
   I thought about it but figured since it's deprecated anyway and we want 
to keep backwards compatibility I would leave it. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-21 Thread GitBox


mjsax commented on a change in pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#discussion_r561429380



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2779,6 +2779,62 @@ public void suspend() {
 assertThat(task01.state(), is(Task.State.SUSPENDED));
 }
 
+@Test
+public void shouldConvertActiveTaskToStandbyTask() {
+final StreamTask activeTask = mock(StreamTask.class);
+expect(activeTask.id()).andReturn(taskId00).anyTimes();
+
expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes();

Review comment:
   Will fix right away -- Jenkins failed anyway and we need to rerun it.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -190,7 +190,7 @@ public void 
shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() {
 final TopicPartition newTopicPartition = new TopicPartition("topic2", 
1);
 final Map> assignment = 
mkMap(mkEntry(taskId01, mkSet(t1p1, newTopicPartition)));
 
-expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andReturn(emptyList()).anyTimes();
+expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(emptyList());

Review comment:
   @ableegoldman I just update the whole test class...

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2647,8 +2647,7 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
 task01.setCommittableOffsetsAndMetadata(offsetsT01);
 final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true);
 
-consumer.groupMetadata();
-expectLastCall().andReturn(null).anyTimes();
+expect(consumer.groupMetadata()).andStubReturn(null);

Review comment:
   Another simplification; make it a one-liner. Same below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-21 Thread GitBox


dengziming commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-764219186







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

2021-01-21 Thread GitBox


mjsax commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-764957937


   @lct45 -- Seems we missed to update the docs, ie, 
`streams/upgrade_guide.html` for this KIP. Can you do a follow up PR for it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper

2021-01-21 Thread GitBox


hachikuji commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r561415958



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int,
 requestQueue.put(request)
   }
 
-  /** Send a response back to the socket server to be sent over the network */
-  def sendResponse(response: RequestChannel.Response): Unit = {
+  def closeConnection(
+request: RequestChannel.Request,
+errorCounts: java.util.Map[Errors, Integer]
+  ): Unit = {
+// This case is used when the request handler has encountered an error, 
but the client
+// does not expect a response (e.g. when produce request has acks set to 0)
+updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
+sendResponse(new RequestChannel.CloseConnectionResponse(request))
+  }
+
+  def sendResponse(
+request: RequestChannel.Request,
+response: AbstractResponse,
+onComplete: Option[Send => Unit]

Review comment:
   I decided not to do this here. I didn't like replacing `None` with `_ => 
{}` in uses, and neither did I like making the argument optional. The 
alternative is to introduce a constant "no-op" function, but I found this also 
a little awkward. If you think of a nice way to do it, I can review. I do think 
it is better having a simpler type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager

2021-01-21 Thread GitBox


mjsax commented on a change in pull request #9835:
URL: https://github.com/apache/kafka/pull/9835#discussion_r561342858



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+class Tasks {
+private final Logger log;
+private final InternalTopologyBuilder builder;
+private final StreamsMetricsImpl streamsMetrics;
+
+private final Map allTasksPerId = new TreeMap<>();
+private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
+private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
+
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerId = new TreeMap<>();
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerPartition = new 
HashMap<>();
+// TODO: change type to `StreamTask`
+private final Map readOnlyActiveTasksPerId = 
Collections.unmodifiableMap(activeTasksPerId);
+private final Set readOnlyActiveTaskIds = 
Collections.unmodifiableSet(activeTasksPerId.keySet());
+// TODO: change type to `StreamTask`
+private final Collection readOnlyActiveTasks = 
Collections.unmodifiableCollection(activeTasksPerId.values());
+
+// TODO: change type to `StandbyTask`
+private final Map standbyTasksPerId = new TreeMap<>();
+// TODO: change type to `StandbyTask`
+private final Map readOnlyStandbyTasksPerId = 
Collections.unmodifiableMap(standbyTasksPerId);
+private final Set readOnlyStandbyTaskIds = 
Collections.unmodifiableSet(standbyTasksPerId.keySet());
+
+private final ActiveTaskCreator activeTaskCreator;
+private final StandbyTaskCreator standbyTaskCreator;
+
+private Consumer mainConsumer;
+
+Tasks(final String logPrefix,
+  final InternalTopologyBuilder builder,
+  final StreamsMetricsImpl streamsMetrics,
+  final ActiveTaskCreator activeTaskCreator,
+  final StandbyTaskCreator standbyTaskCreator) {
+
+final LogContext logContext = new LogContext(logPrefix);
+log = logContext.logger(getClass());
+
+this.builder = builder;
+this.streamsMetrics = streamsMetrics;
+this.activeTaskCreator = activeTaskCreator;
+this.standbyTaskCreator = standbyTaskCreator;
+}
+
+void setMainConsumer(final Consumer mainConsumer) {
+this.mainConsumer = mainConsumer;
+}
+
+void createTasks(final Map> 
activeTasksToCreate,
+ final Map> 
standbyTasksToCreate) {
+for (final Map.Entry> taskToBeCreated : 
activeTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (activeTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create an active 
task that we already own: " + taskId);
+}
+}
+
+for (final Map.Entry> taskToBeCreated : 
standbyTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (standbyTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create a standby 
task that we already own: " + taskId);
+}
+}
+
+// keep this check to simplify testing (ie, no need to mock 
`activeTaskCreator`)
+if (!activeTasksToCreate.isEmpty()) {
+// TODO: change type to `StreamTask`
+for (final Task activeTask : 

[GitHub] [kafka] dajac merged pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest

2021-01-21 Thread GitBox


dajac merged pull request #9938:
URL: https://github.com/apache/kafka/pull/9938


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9945: KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields

2021-01-21 Thread GitBox


dajac commented on a change in pull request #9945:
URL: https://github.com/apache/kafka/pull/9945#discussion_r561957390



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
 var clusterAuthorizedOperations = Int.MinValue

Review comment:
   `-2147483648` is actually `Int.MinValue`. It would cause an error if we 
would write something different from the default value for versions < 8 and > 
10. 

##
File path: clients/src/main/resources/common/message/MetadataRequest.json
##
@@ -31,7 +31,10 @@
 // Starting in version 8, authorized operations can be requested for 
cluster and topic resource.
 //
 // Version 9 is the first flexible version.
-// Version 10 add topicId
+//
+// Version 10 adds topicId.
+//
+// Version 11 deprecates IncludeClusterAuthorizedOperations field 
(KIP-700).

Review comment:
   Yeah, why not. I will add it.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
 var clusterAuthorizedOperations = Int.MinValue

Review comment:
   Will do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2021-01-21 Thread GitBox


mjsax edited a comment on pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#issuecomment-764961250


   @wcarlson5 Seems we forgot the update the docs for this KIP. Can you do a 
follow up PR? This follow up PR should also cover the changes of 
https://issues.apache.org/jira/browse/KAFKA-10810 that is part if the same KIP.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9948: Minor: fix record time

2021-01-21 Thread GitBox


wcarlson5 commented on pull request #9948:
URL: https://github.com/apache/kafka/pull/9948#issuecomment-764984984


   @mjsax  I extracted the fix



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9720: KAFKA-10555: Improve client state machine

2021-01-21 Thread GitBox


wcarlson5 commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-764986776


   I extracted the https://github.com/apache/kafka/pull/9720/files#r562203226 
fix to https://github.com/apache/kafka/pull/9948



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twmb commented on a change in pull request #9903: KAFKA-12204; Implement DescribeCluster API in the broker

2021-01-21 Thread GitBox


twmb commented on a change in pull request #9903:
URL: https://github.com/apache/kafka/pull/9903#discussion_r562394942



##
File path: 
clients/src/main/resources/common/message/DescribeClusterResponse.json
##
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 60,
+  "type": "response",
+  "name": "DescribeClusterResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top-level error code, or 0 if there was no error" },
+{ "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The top-level error message, or null if there was no error." },
+{ "name": "ClusterId", "type": "string", "nullableVersions": "0+", 
"versions": "0+", "default": "null",

Review comment:
   Does it? Does MetadataResponse? Is there a reason Metadata does and this 
doesn't?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches

2021-01-21 Thread GitBox


dajac merged pull request #9832:
URL: https://github.com/apache/kafka/pull/9832


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-21 Thread GitBox


ableegoldman merged pull request #9941:
URL: https://github.com/apache/kafka/pull/9941


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-21 Thread GitBox


mjsax commented on pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#issuecomment-764070908


   All three runs failed with different errors:
   
   JDK8:
   ```
   
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
37.320395596193315 (600 connections / 16.077 sec) ==> expected: <30.0> but was: 
<37.320395596193315>
   ```
   
   JDK11:
   ```
   kafka.admin.FeatureCommandTest.testDescribeFeaturesSuccess()
   org.opentest4j.AssertionFailedError: expected:  but was: 
   ```
   
   JDK15:
   ```
   kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback()
   org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] 
metadata not propagated after 15000 ms
   
   // and
   
   kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback()
   org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] 
metadata not propagated after 15000 ms
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot

2021-01-21 Thread GitBox


hachikuji commented on a change in pull request #9819:
URL: https://github.com/apache/kafka/pull/9819#discussion_r561625246



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException {
 }
 
 @Override
-public void append(ByteBuffer buffer) throws IOException {
+public void append(BaseRecords records) throws IOException {
 if (frozen) {
 throw new IllegalStateException(
-String.format("Append is not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+String.format("Append is not supported. Snapshot is 
already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
 );
 }
-
+ByteBuffer buffer;
+if (records instanceof MemoryRecords) {
+buffer = ((MemoryRecords) records).buffer();
+} else {

Review comment:
   I feel more inclined to raise an exception if we get a `BaseRecords` 
type that is not `MemoryRecords`. If we really get an unexpected file in here, 
then we need to reconsider the IO model instead of hiding a big copy. We could 
even make the expectation explicit in the parameter type even if it is not 100% 
symmetric with `RawSnapshotReader`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1093,9 +1093,11 @@ public static final void readFully(InputStream 
inputStream, ByteBuffer destinati
 destinationBuffer.position(destinationBuffer.position() + 
totalBytesRead);
 }
 
-public static void writeFully(FileChannel channel, ByteBuffer 
sourceBuffer) throws IOException {
+public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) 
throws IOException {
+int size = 0;

Review comment:
   Hmm.. Not sure we need to compute this. Wouldn't it be the same as 
`sourceBuffer.remaining()`?

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 if (!snapshotOpt.isPresent()) {
 return FetchSnapshotResponse.singleton(
 log.topicPartition(),
-responsePartitionSnapshot -> {
-return addQuorumLeader(responsePartitionSnapshot)
-.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
-}
+responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code())
 );
 }
 
 try (RawSnapshotReader snapshot = snapshotOpt.get()) {
 if (partitionSnapshot.position() < 0 || 
partitionSnapshot.position() >= snapshot.sizeInBytes()) {
 return FetchSnapshotResponse.singleton(
 log.topicPartition(),
-responsePartitionSnapshot -> {
-return addQuorumLeader(responsePartitionSnapshot)
-.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code());
-}
+responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code())
 );
 }
 
 int maxSnapshotSize;
+int maxSnapshotPosition;
 try {
 maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
 } catch (ArithmeticException e) {
 maxSnapshotSize = Integer.MAX_VALUE;
 }
 
-ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), 
maxSnapshotSize));
-snapshot.read(buffer, partitionSnapshot.position());
-buffer.flip();
+try {
+maxSnapshotPosition = 
Math.toIntExact(partitionSnapshot.position());
+} catch (ArithmeticException e) {
+maxSnapshotPosition = Integer.MAX_VALUE;

Review comment:
   I agree we should probably throw this. Snapshot size limits are an 
interesting point which I hadn't thought about. Currently `FileRecords` does 
not support files which are larger than Int.MaxValue. That gives us a 2GB 
limit. My feeling is that is probably good enough initially, but perhaps that 
adds some fuel for the effort to generalize the zero-copy support.

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##
@@ -51,7 +51,7 @@ public long sizeInBytes() {
 }
 
 public BaseRecords read(long position, int size) throws IOException {
-return fileRecords.slice(Math.toIntExact(position), size);
+return fileRecords.slice((int) position, size);

Review comment:
   Using `Math.toIntExact` seemed better.

##
File path: 

[GitHub] [kafka] vvcephei commented on pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-21 Thread GitBox


vvcephei commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-764879234


   Thanks, @inponomarev !
   
   Yes, we sometimes have to update the KIP as the PR gets finalized. I like to 
send a quick reply to the vote thread on the mailing list to summarize the 
design changes, in case anyone wants to object (I don't think I've ever had 
someone object).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-21 Thread GitBox


guozhangwang commented on pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#issuecomment-764408620


   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-21 Thread GitBox


hachikuji commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561283768



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java
##
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.raft;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.jupiter.api.Test;
-
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public class RaftConfigTest {
-
-@Test
-public void testSingleQuorumVoterConnections() {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@127.0.0.1:9092");
-RaftConfig config = new RaftConfig(properties);
-assertEquals(Collections.singletonMap(1, new 
InetSocketAddress("127.0.0.1", 9092)),
-config.quorumVoterConnections());
-}
-
-@Test
-public void testMultiQuorumVoterConnections() {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@kafka1:9092,2@kafka2:9092,3@kafka3:9092");
-RaftConfig config = new RaftConfig(properties);
-
-HashMap expected = new HashMap<>();
-expected.put(1, new InetSocketAddress("kafka1", 9092));
-expected.put(2, new InetSocketAddress("kafka2", 9092));
-expected.put(3, new InetSocketAddress("kafka3", 9092));
-
-assertEquals(expected, config.quorumVoterConnections());

Review comment:
   This test case seems stronger than the one that was ported to 
`KafkaConfigTest`. It is validating the endpoints in addition to the number of 
voters. Is there any way we can recover this?

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) {
 return voterMap;
 }
 
+public static class ControllerQuorumVotersValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null);
+}
+
+@SuppressWarnings("unchecked")
+List voterStrings = (List) value;
+
+if (voterStrings.size() == 0) {
+// TODO: Add a flag to skip validation for an empty voter 
string, conditionally.
+//   For now, skip anyway. See 
https://github.com/apache/kafka/pull/9916#discussion_r560611932

Review comment:
   We typically do not leave TODOs in the code. We can file a jira if we 
think it's important to remember. I'd suggest we just leave this check out and 
skip the empty check below.

##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -941,4 +950,38 @@ class KafkaConfigTest {
 })
   }
 
+  @Test
+  def testInvalidQuorumVotersConfig(): Unit = {
+assertInvalidQuorumVoters("1")
+assertInvalidQuorumVoters("1@")
+assertInvalidQuorumVoters("1:")
+assertInvalidQuorumVoters("blah@")
+assertInvalidQuorumVoters("1@kafka1")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,2")
+assertInvalidQuorumVoters("1@kafka1:9092,2@")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
+  }
+
+  private def assertInvalidQuorumVoters(value: String): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+  }
+
+  @Test
+  def testValidQuorumVotersConfig(): Unit = {
+assertValidQuorumVoters("", 0)
+assertValidQuorumVoters("1@127.0.0.1:9092", 1)
+assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3)
+  }
+
+  private def assertValidQuorumVoters(value: String, 

[GitHub] [kafka] chia7712 merged pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-21 Thread GitBox


chia7712 merged pull request #9926:
URL: https://github.com/apache/kafka/pull/9926


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9946: KAFKA-12230: do not interrupt execution on failure to `setPosixFilePermissions` on Windows machines

2021-01-21 Thread GitBox


mjsax commented on pull request #9946:
URL: https://github.com/apache/kafka/pull/9946#issuecomment-764913632


   @inponomarev Thanks for the PR.
   
   Seems there is already #9947 for this issue that seems to be more 
sophisticated than just swallowing the exception. I think we should close this 
PR in favor of #9947?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-21 Thread GitBox


chia7712 commented on a change in pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#discussion_r562029718



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
##
@@ -27,112 +27,81 @@
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
-import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.support.AnnotationConsumer;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import java.util.function.BiPredicate;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.Arrays.asList;
-import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
-import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
-import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
-import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
 import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class MemoryRecordsBuilderTest {
 
 private static class Args {
 final int bufferOffset;
-final byte magic;
 final CompressionType compressionType;
 
-public Args(int bufferOffset, byte magic, CompressionType 
compressionType) {
+public Args(int bufferOffset, CompressionType compressionType) {
 this.bufferOffset = bufferOffset;
-this.magic = magic;
 this.compressionType = compressionType;
 }
 
 @Override
 public String toString() {
-return "Args{" +
-"bufferOffset=" + bufferOffset +
-", magic=" + magic +
-", compressionType=" + compressionType +
-'}';
+return "bufferOffset=" + bufferOffset +
+", compressionType=" + compressionType;
 }
 }
 
-private static Stream allArguments(BiPredicate accept) {
+private static Stream allArguments(Predicate 
accept) {
 List values = new ArrayList<>();
 for (int bufferOffset : Arrays.asList(0, 15))
-for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2))
-for (CompressionType compressionType : 
CompressionType.values())
-if (accept.test(magic, compressionType))
-values.add(Arguments.of(new Args(bufferOffset, magic, 
compressionType)));
+for (CompressionType compressionType : CompressionType.values())
+if (accept.test(compressionType))
+values.add(Arguments.of(new Args(bufferOffset, 
compressionType)));
 return values.stream();
 }
 
-private static class MemoryRecordsBuilderArgumentsProvider implements 
ArgumentsProvider, AnnotationConsumer {
-
-private RecordBuilderSource recordSource;
-private BiPredicate validCompress = (magic, 
type) -> type != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2;
-
+private static class MemoryRecordsBuilderArgumentsProvider implements 
ArgumentsProvider {
 @Override
 public Stream provideArguments(ExtensionContext 
context) {
-Predicate predicate = magic -> recordSource.minMagic() <= 
magic && magic <= recordSource.maxMagic();
-return allArguments((magic, type) -> 
(recordSource.haveInvalidCompress() || validCompress.test(magic, type)) && 
predicate.test(magic));
+return allArguments(type -> true);
 }
+}
 
+private static class NotZstd implements ArgumentsProvider {

Review comment:
   Please add comment for this suite.

##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
##
@@ -423,30 +455,32 @@ public void buildUsingCreateTime(Args args) {
 }
 
 @ParameterizedTest
-@RecordBuilderSource
+@ArgumentsSource(NotZstd.class)
 public void testAppendedChecksumConsistency(Args args) {

Review comment:
   this test case should test zstd + magic_2. We can changes the magic code 
according to compression type.





[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-21 Thread GitBox


chia7712 commented on pull request #9926:
URL: https://github.com/apache/kafka/pull/9926#issuecomment-764212202


   > What have you seen that implies that? 
   
   The following error implies that recent fixes are not included.
   
   ```
   java.lang.AssertionError: Expected all streams instances in 
[org.apache.kafka.streams.KafkaStreams@2d6a0fff] to be REBALANCING within 3 
ms, but the following were not: 
{org.apache.kafka.streams.KafkaStreams@2d6a0fff=RUNNING}
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:936)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:318)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:919)
at 
org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect(AdjustStreamThreadCountTest.java:229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   ```
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9926/8/testReport/junit/org.apache.kafka.streams.integration/AdjustStreamThreadCountTest/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon closed pull request #9936: [WIP] reset to default class loader

2021-01-21 Thread GitBox


showuon closed pull request #9936:
URL: https://github.com/apache/kafka/pull/9936


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-21 Thread GitBox


guozhangwang commented on pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#issuecomment-765031712


   Made another pass on the latest commit and it LGTM. Also triggered the test 
again.
   
   Once it passed we can merge as-is.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-21 Thread GitBox


ableegoldman commented on a change in pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#discussion_r561394151



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1053,7 +1054,7 @@ int maybeCommitActiveTasksPerUserRequested() {
 }
 
 private void commitOffsetsOrTransaction(final Map> offsetsPerTask) {
-log.debug("Committing task offsets {}", offsetsPerTask);
+log.debug("Committing task offsets {}", 
offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> 
t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects

Review comment:
   An unrelated but equally annoying thing I noticed in the logs: we should 
never log a full `Task` object because it prints literally everything about the 
task, including for example the topology description which is not that useful 
but sometimes VERY long

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -407,7 +413,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
 referenceContainer.nextScheduledRebalanceMs,
 shutdownErrorHook,
 streamsUncaughtExceptionHandler,
-cacheSize -> cache.resize(cacheSize)

Review comment:
   Interesting. It *should* be exactly the same, but of course who knows 
with Java. Did it cause a test to fail or was it something more subtle?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-21 Thread GitBox


hachikuji merged pull request #9916:
URL: https://github.com/apache/kafka/pull/9916


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class

2021-01-21 Thread GitBox


ableegoldman commented on a change in pull request #9940:
URL: https://github.com/apache/kafka/pull/9940#discussion_r561368977



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2779,6 +2779,62 @@ public void suspend() {
 assertThat(task01.state(), is(Task.State.SUSPENDED));
 }
 
+@Test
+public void shouldConvertActiveTaskToStandbyTask() {
+final StreamTask activeTask = mock(StreamTask.class);
+expect(activeTask.id()).andReturn(taskId00).anyTimes();
+
expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes();

Review comment:
   nit:  use `andStubReturn` instead of `andReturn().anyTimes`. No need to 
change this now, don't want to block the fix, just fyi for future PRs 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9660: Kafka 10629 - TopologyTestDriver should not require a Properties argument

2021-01-21 Thread GitBox


mjsax commented on pull request #9660:
URL: https://github.com/apache/kafka/pull/9660#issuecomment-764958643


   @rohitrmd Seems we forgot to update the docs with this PR, in particular 
`streams/upgarde_guide.html` and maybe also the section about testing? Would 
you like to do a follow up PR to close this gap?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file

2021-01-21 Thread GitBox


chia7712 merged pull request #9891:
URL: https://github.com/apache/kafka/pull/9891


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

2021-01-21 Thread GitBox


showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-764474867


   @rajinisivaram  @junrao , please help review this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-21 Thread GitBox


rajinisivaram commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r562233235



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
##
@@ -65,6 +65,19 @@ public TopicDescription(String name, boolean internal, 
List
 this(name, internal, partitions, Collections.emptySet());
 }
 
+/**
+ * Create an instance with the specified parameters.
+ *
+ * @param name The topic name
+ * @param internal Whether the topic is internal to Kafka
+ * @param partitions A list of partitions where the index represents the 
partition id and the element contains
+ *   leadership and replica information for that partition.
+ * @param topicId the topic id
+ */
+public TopicDescription(String name, boolean internal, 
List partitions, Uuid topicId) {

Review comment:
   Do we really need this constructor in the public class? We could just 
use the one below?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) {
 return new DescribeTopicsResult(new HashMap<>(topicFutures));
 }
 
+@Override
+public DescribeTopicsResultWithIds describeTopicsWithIds(Collection 
topicIds, DescribeTopicsOptions options) {
+
+final Map> topicFutures = new 
HashMap<>(topicIds.size());
+final List topicIdsList = new ArrayList<>();
+for (Uuid topicId : topicIds) {
+if (topicIdIsUnrepresentable(topicId)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+topicId + "' cannot be represented in a request."));
+topicFutures.put(topicId, future);
+} else if (!topicFutures.containsKey(topicId)) {
+topicFutures.put(topicId, new KafkaFutureImpl<>());
+topicIdsList.add(topicId);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+
+@Override
+MetadataRequest.Builder createRequest(int timeoutMs) {
+return new MetadataRequest.Builder(new MetadataRequestData()
+
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+.setAllowAutoTopicCreation(false)
+
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+MetadataResponse response = (MetadataResponse) 
abstractResponse;
+// Handle server responses for particular topics.
+Cluster cluster = response.cluster();
+Map errors = response.errorsByTopicId();
+for (Map.Entry> entry 
: topicFutures.entrySet()) {
+Uuid topicId = entry.getKey();
+KafkaFutureImpl future = 
entry.getValue();
+
+String topicName = cluster.topicName(topicId);
+if (topicName == null) {
+future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+continue;
+}
+Errors topicError = errors.get(topicId);
+if (topicError != null) {
+future.completeExceptionally(topicError.exception());
+continue;
+}
+
+boolean isInternal = 
cluster.internalTopics().contains(topicName);
+List partitionInfos = 
cluster.partitionsForTopic(topicName);
+List partitions = new 
ArrayList<>(partitionInfos.size());
+for (PartitionInfo partitionInfo : partitionInfos) {
+TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
+partitionInfo.partition(), 
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+Arrays.asList(partitionInfo.inSyncReplicas()));
+partitions.add(topicPartitionInfo);
+}
+
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+TopicDescription topicDescription = new 
TopicDescription(topicName, isInternal, partitions,
+
validAclOperations(response.topicAuthorizedOperations(topicName).get()), 
topicId);
+future.complete(topicDescription);
+}
+}
+
+private Node 

[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log

2021-01-21 Thread GitBox


feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r561587550



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -180,7 +179,7 @@ private void startNewBatch() {
 nextOffset,
 time.milliseconds(),
 false,
-RecordBatch.NO_PARTITION_LEADER_EPOCH,
+epoch,

Review comment:
   Let me check

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
  leaderEpoch: Int,
  origin: AppendOrigin = AppendOrigin.Client,
  interBrokerProtocolVersion: ApiVersion = 
ApiVersion.latestVersion): LogAppendInfo = {
-append(records, origin, interBrokerProtocolVersion, assignOffsets = true, 
leaderEpoch, ignoreRecordSize = false)
+val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+append(records, origin, interBrokerProtocolVersion, 
validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)

Review comment:
   Make sense to me, it is also a little bit odd to me, but I put it here 
because I think `assignOffsets`==true for `appendAsLeader` and ==false for 
`appendAsFollower`, which means normally `assignOffsets` is determined by the 
caller, the `RaftLeader` is just a special case for `appendAsLeader`, if we 
move the logic in `analyzeAndValidateRecords`, that means it need to determine 
whether to `assignOffsets` without caller info, does that doable? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-21 Thread GitBox


hachikuji commented on pull request #9934:
URL: https://github.com/apache/kafka/pull/9934#issuecomment-763923618


   I decided to leave the controller-only flag as it is. I think there are 
further improvements here to make the scope of the API clearer, but the 
implications for compatibility are subtle enough that we should consider it 
separately.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5

2021-01-21 Thread GitBox


ijuma commented on pull request #9926:
URL: https://github.com/apache/kafka/pull/9926#issuecomment-763893680


   @chia7712 What have you seen that implies that? This was certainly the case 
before, not sure if something changed at some point (I know because I've seen 
compiler errors that would not be possible without the merge).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2021-01-21 Thread GitBox


ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539


   Weird -- these changes seem to be causing the `SaslXConsumerTest` family of 
tests to hang. I'm not very (or at all) familiar with these tests so I haven't 
found anything yet but I'm actively looking into it 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9878: KAFKA-6987: Reimplement KafkaFuture using CompletableFuture

2021-01-21 Thread GitBox


dajac commented on pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#issuecomment-764677084


   @ijuma I was actually thinking about the same. The new or the old could 
likely be a thin facade of the other. Another approach would be to duplicate 
all the methods in the *Result classes but that does not sound good to me.
   
   I think that it would be nice if we could eventually remove `KafkaFuture` 
and use `CompletionStage` instead. That would be nice from a user of the admin 
api perspective at least. That would also be inline with the producer api 
KIP-706 lands in the future.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9947: KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems

2021-01-21 Thread GitBox


ableegoldman commented on a change in pull request #9947:
URL: https://github.com/apache/kafka/pull/9947#discussion_r562317985



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -109,16 +109,27 @@ public StateDirectory(final StreamsConfig config, final 
Time time, final boolean
 log.warn("Using /tmp directory in the state.dir property can 
cause failures with writing the checkpoint file" +
 " due to the fact that this directory can be cleared by 
the OS");
 }
-
 // change the dir permission to "rwxr-x---" to avoid world readable
-final Path basePath = Paths.get(baseDir.getPath());
-final Path statePath = Paths.get(stateDir.getPath());
+configurePermissions(Paths.get(baseDir.getPath()));
+configurePermissions(Paths.get(stateDir.getPath()));
+}
+}
+
+private void configurePermissions(final Path path) {
+if 
(path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
 final Set perms = 
PosixFilePermissions.fromString("rwxr-x---");
 try {
-Files.setPosixFilePermissions(basePath, perms);
-Files.setPosixFilePermissions(statePath, perms);
+Files.setPosixFilePermissions(path, perms);
 } catch (final IOException e) {
-log.error("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);
+log.error("Error changing permissions for the directory {} ", 
path, e);
+}
+} else {
+final File file = path.toFile();
+boolean set = file.setReadable(true, false);
+set &= file.setWritable(true, true);

Review comment:
   If you only have `file.setWritable(true, true)` then the directory will 
still be writeable by non-users, I assume?  I actually don't know the details 
of the `File#setXXX` methods -- but we don't want it to be writeable by just 
anyone. Should we instead do something like
   
   ```suggestion
   set &= file.setWritable(false) && file.setWritable(true, true);
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -109,16 +109,27 @@ public StateDirectory(final StreamsConfig config, final 
Time time, final boolean
 log.warn("Using /tmp directory in the state.dir property can 
cause failures with writing the checkpoint file" +
 " due to the fact that this directory can be cleared by 
the OS");
 }
-
 // change the dir permission to "rwxr-x---" to avoid world readable
-final Path basePath = Paths.get(baseDir.getPath());
-final Path statePath = Paths.get(stateDir.getPath());
+configurePermissions(Paths.get(baseDir.getPath()));
+configurePermissions(Paths.get(stateDir.getPath()));
+}
+}
+
+private void configurePermissions(final Path path) {
+if 
(path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
 final Set perms = 
PosixFilePermissions.fromString("rwxr-x---");
 try {
-Files.setPosixFilePermissions(basePath, perms);
-Files.setPosixFilePermissions(statePath, perms);
+Files.setPosixFilePermissions(path, perms);
 } catch (final IOException e) {
-log.error("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);
+log.error("Error changing permissions for the directory {} ", 
path, e);
+}
+} else {
+final File file = path.toFile();
+boolean set = file.setReadable(true, false);

Review comment:
   I think we actually want it to be readable _only_ by the user, and 
explicitly restrict permissions for all other users. The patch which originally 
broke things for Windows users was trying to tighten up the security in exactly 
this way





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

2021-01-21 Thread GitBox


wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562043444



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
 metrics.close();
 if (!error) {
 setState(State.NOT_RUNNING);
+} else {
+setState(State.ERROR);
 }
 }, "kafka-streams-close-thread");
 }
 
 private boolean close(final long timeoutMs) {
+if (state == State.ERROR) {
+log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+return false;

Review comment:
   Alright I adjusted the close response to align with this.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -978,7 +971,7 @@ private void waitForRunning(final 
List !observed.isEmpty() && observed.get(observed.size() - 
1).value.equals(State.RUNNING),
 MAX_WAIT_TIME_MS,
-() -> "Client did not startup on time. Observers transitions: " + 
observed
+() -> "Client did not have the expected state transition on time. 
Observers transitions: " + observed

Review comment:
   We do wait for running, I was thinking of bringing it to match with the 
other methods below but that doesn't make it anymore useful so I will revet it.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -993,6 +986,17 @@ private void waitForStateTransition(final 
List> observed,
+final 
List> expected)
+throws Exception {
+
+waitForCondition(
+() -> observed.containsAll(expected),
+MAX_WAIT_TIME_MS,
+() -> "Client did not have the expected state transition on time. 
Observers transitions: " + observed

Review comment:
   sure that is fine





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed

2021-01-21 Thread GitBox


showuon commented on pull request #9791:
URL: https://github.com/apache/kafka/pull/9791#issuecomment-764474682


   @rhauch @kkonstantine , please help review this simple PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2021-01-21 Thread GitBox


yeralin commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-763952940


   @mjsax Hey, just rebased my branch with the trunk, updated my tests to use 
JUnit 5.
   Let me know when you guys will have time to review it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-21 Thread GitBox


dengziming commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r562333723



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -313,42 +313,57 @@ object TopicCommand extends Logging {
 }
 
 override def describeTopic(opts: TopicCommandOptions): Unit = {
-  val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-  ensureTopicExists(topics, opts.topic, !opts.ifExists)
+  val topicId = opts.topicId.map(Uuid.fromString).filter(_ != 
Uuid.ZERO_UUID)
+  // if topicId is provided and not zero, will use topicId regardless of 
topic name

Review comment:
   that's what I thought at the beginning, now I think it's better to use 
topicId and print a warning message if both are provided.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) {
 return new DescribeTopicsResult(new HashMap<>(topicFutures));
 }
 
+@Override
+public DescribeTopicsResultWithIds describeTopicsWithIds(Collection 
topicIds, DescribeTopicsOptions options) {
+
+final Map> topicFutures = new 
HashMap<>(topicIds.size());
+final List topicIdsList = new ArrayList<>();
+for (Uuid topicId : topicIds) {
+if (topicIdIsUnrepresentable(topicId)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+topicId + "' cannot be represented in a request."));
+topicFutures.put(topicId, future);
+} else if (!topicFutures.containsKey(topicId)) {
+topicFutures.put(topicId, new KafkaFutureImpl<>());
+topicIdsList.add(topicId);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+
+@Override
+MetadataRequest.Builder createRequest(int timeoutMs) {
+return new MetadataRequest.Builder(new MetadataRequestData()
+
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+.setAllowAutoTopicCreation(false)
+
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+MetadataResponse response = (MetadataResponse) 
abstractResponse;
+// Handle server responses for particular topics.
+Cluster cluster = response.cluster();
+Map errors = response.errorsByTopicId();
+for (Map.Entry> entry 
: topicFutures.entrySet()) {
+Uuid topicId = entry.getKey();
+KafkaFutureImpl future = 
entry.getValue();
+
+String topicName = cluster.topicName(topicId);
+if (topicName == null) {
+future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+continue;
+}
+Errors topicError = errors.get(topicId);
+if (topicError != null) {
+future.completeExceptionally(topicError.exception());
+continue;
+}
+
+boolean isInternal = 
cluster.internalTopics().contains(topicName);
+List partitionInfos = 
cluster.partitionsForTopic(topicName);
+List partitions = new 
ArrayList<>(partitionInfos.size());
+for (PartitionInfo partitionInfo : partitionInfos) {
+TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
+partitionInfo.partition(), 
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+Arrays.asList(partitionInfo.inSyncReplicas()));
+partitions.add(topicPartitionInfo);
+}
+
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+TopicDescription topicDescription = new 
TopicDescription(topicName, isInternal, partitions,
+
validAclOperations(response.topicAuthorizedOperations(topicName).get()), 
topicId);
+future.complete(topicDescription);

Review comment:
   Thank you, I added a getTopicDescriptionFromCluster to do this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use 

[GitHub] [kafka] chia7712 commented on pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file

2021-01-21 Thread GitBox


chia7712 commented on pull request #9891:
URL: https://github.com/apache/kafka/pull/9891#issuecomment-764233284


   @17hao Thanks for your patch!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


guozhangwang commented on pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#issuecomment-765055878


   LGTM! Once we have perf numbers quantifying its impact I think we can merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hongshaoyang commented on pull request #9943: MINOR: Fix typo in Utils.java

2021-01-21 Thread GitBox


hongshaoyang commented on pull request #9943:
URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143


   Ping @guozhangwang 
   
   Original typo was added in 
   
https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9878: KAFKA-6987: Reimplement KafkaFuture using CompletableFuture

2021-01-21 Thread GitBox


tombentley commented on pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#issuecomment-764649903







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager

2021-01-21 Thread GitBox


ableegoldman commented on a change in pull request #9835:
URL: https://github.com/apache/kafka/pull/9835#discussion_r561340839



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+class Tasks {
+private final Logger log;
+private final InternalTopologyBuilder builder;
+private final StreamsMetricsImpl streamsMetrics;
+
+private final Map allTasksPerId = new TreeMap<>();
+private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
+private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
+
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerId = new TreeMap<>();
+// TODO: change type to `StreamTask`
+private final Map activeTasksPerPartition = new 
HashMap<>();
+// TODO: change type to `StreamTask`
+private final Map readOnlyActiveTasksPerId = 
Collections.unmodifiableMap(activeTasksPerId);
+private final Set readOnlyActiveTaskIds = 
Collections.unmodifiableSet(activeTasksPerId.keySet());
+// TODO: change type to `StreamTask`
+private final Collection readOnlyActiveTasks = 
Collections.unmodifiableCollection(activeTasksPerId.values());
+
+// TODO: change type to `StandbyTask`
+private final Map standbyTasksPerId = new TreeMap<>();
+// TODO: change type to `StandbyTask`
+private final Map readOnlyStandbyTasksPerId = 
Collections.unmodifiableMap(standbyTasksPerId);
+private final Set readOnlyStandbyTaskIds = 
Collections.unmodifiableSet(standbyTasksPerId.keySet());
+
+private final ActiveTaskCreator activeTaskCreator;
+private final StandbyTaskCreator standbyTaskCreator;
+
+private Consumer mainConsumer;
+
+Tasks(final String logPrefix,
+  final InternalTopologyBuilder builder,
+  final StreamsMetricsImpl streamsMetrics,
+  final ActiveTaskCreator activeTaskCreator,
+  final StandbyTaskCreator standbyTaskCreator) {
+
+final LogContext logContext = new LogContext(logPrefix);
+log = logContext.logger(getClass());
+
+this.builder = builder;
+this.streamsMetrics = streamsMetrics;
+this.activeTaskCreator = activeTaskCreator;
+this.standbyTaskCreator = standbyTaskCreator;
+}
+
+void setMainConsumer(final Consumer mainConsumer) {
+this.mainConsumer = mainConsumer;
+}
+
+void createTasks(final Map> 
activeTasksToCreate,
+ final Map> 
standbyTasksToCreate) {
+for (final Map.Entry> taskToBeCreated : 
activeTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (activeTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create an active 
task that we already own: " + taskId);
+}
+}
+
+for (final Map.Entry> taskToBeCreated : 
standbyTasksToCreate.entrySet()) {
+final TaskId taskId = taskToBeCreated.getKey();
+
+if (standbyTasksPerId.containsKey(taskId)) {
+throw new IllegalStateException("Attempted to create a standby 
task that we already own: " + taskId);
+}
+}
+
+// keep this check to simplify testing (ie, no need to mock 
`activeTaskCreator`)
+if (!activeTasksToCreate.isEmpty()) {
+// TODO: change type to `StreamTask`
+for (final Task activeTask : 

[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-21 Thread GitBox


hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561451602



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
   In the case of forwarding, maybe we can let the controller decide if 
there are enough alive brokers.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,

[GitHub] [kafka] vvcephei commented on a change in pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#discussion_r561967937



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##
@@ -218,7 +218,21 @@ public synchronized void unsubscribe() {
 }
 
 toClear.forEach(p -> this.records.remove(p));
-return new ConsumerRecords<>(results);
+
+final Map metadata = new 
HashMap<>();
+for (final TopicPartition partition : 
subscriptions.assignedPartitions()) {
+if (subscriptions.hasValidPosition(partition) && 
beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) {

Review comment:
   Ah, this is from before I removed it.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 } else {
 List> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-if (!records.isEmpty()) {
-TopicPartition partition = nextInLineFetch.partition;
-List> currentRecords = 
fetched.get(partition);
-if (currentRecords == null) {
-fetched.put(partition, records);
-} else {
-// this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-// but it might conceivably happen in some rare 
cases (such as partition leader changes).
-// we have to copy to a new list because the old 
one may be immutable
-List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-newRecords.addAll(currentRecords);
-newRecords.addAll(records);
-fetched.put(partition, newRecords);
+TopicPartition partition = nextInLineFetch.partition;
+
+if (subscriptions.isAssigned(partition)) {

Review comment:
   I copied this check from fetchRecords, which says "this can happen when 
a rebalance happened before fetched records are returned to the consumer's poll 
call". I.e., it seems like it can actually happen, but a comment is called for, 
at least. I'll add it.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 } else {
 List> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-if (!records.isEmpty()) {
-TopicPartition partition = nextInLineFetch.partition;
-List> currentRecords = 
fetched.get(partition);
-if (currentRecords == null) {
-fetched.put(partition, records);
-} else {
-// this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-// but it might conceivably happen in some rare 
cases (such as partition leader changes).
-// we have to copy to a new list because the old 
one may be immutable
-List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-newRecords.addAll(currentRecords);
-newRecords.addAll(records);
-fetched.put(partition, newRecords);
+TopicPartition partition = nextInLineFetch.partition;
+
+if (subscriptions.isAssigned(partition)) {
+// initializeCompletedFetch, above, has already 
persisted the metadata from the fetch in the
+// SubscriptionState, so we can just read it out, 
which in particular lets us re-use the logic
+// for determining the end offset
+final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+final Long beginningOffset = 
subscriptions.logStartOffset(partition);
+final Long endOffset = 
subscriptions.logEndOffset(partition, isolationLevel);
+final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+if (fetchMetadata == null
+|| 
!fetchMetadata.position().offsetEpoch.isPresent()
+|| fetchPosition.offsetEpoch.isPresent()
+&& fetchMetadata.position().offsetEpoch.get() <= 

[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig

2021-01-21 Thread GitBox


aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561315012



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -941,4 +950,38 @@ class KafkaConfigTest {
 })
   }
 
+  @Test
+  def testInvalidQuorumVotersConfig(): Unit = {
+assertInvalidQuorumVoters("1")
+assertInvalidQuorumVoters("1@")
+assertInvalidQuorumVoters("1:")
+assertInvalidQuorumVoters("blah@")
+assertInvalidQuorumVoters("1@kafka1")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,")
+assertInvalidQuorumVoters("1@kafka1:9092,2")
+assertInvalidQuorumVoters("1@kafka1:9092,2@")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
+assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
+  }
+
+  private def assertInvalidQuorumVoters(value: String): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+  }
+
+  @Test
+  def testValidQuorumVotersConfig(): Unit = {
+assertValidQuorumVoters("", 0)
+assertValidQuorumVoters("1@127.0.0.1:9092", 1)
+assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3)
+  }
+
+  private def assertValidQuorumVoters(value: String, expectedVoterCount: Int): 
Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+assertDoesNotThrow(() => KafkaConfig.fromProps(props))

Review comment:
   Makes sense. Removed.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) {
 return voterMap;
 }
 
+public static class ControllerQuorumVotersValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null);
+}
+
+@SuppressWarnings("unchecked")
+List voterStrings = (List) value;
+
+if (voterStrings.size() == 0) {
+// TODO: Add a flag to skip validation for an empty voter 
string, conditionally.
+//   For now, skip anyway. See 
https://github.com/apache/kafka/pull/9916#discussion_r560611932

Review comment:
   Fair enough. Removed.

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1258,6 +1267,15 @@ object KafkaConfig {
   .define(PasswordEncoderCipherAlgorithmProp, STRING, 
Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc)
   .define(PasswordEncoderKeyLengthProp, INT, 
Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
   .define(PasswordEncoderIterationsProp, INT, 
Defaults.PasswordEncoderIterations, atLeast(1024), LOW, 
PasswordEncoderIterationsDoc)
+
+  /** * Raft Quorum Configuration */
+  .defineInternal(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, 
Defaults.QuorumVoters, new RaftConfig.ControllerQuorumVotersValidator(), HIGH)

Review comment:
   Ack. Fixed.

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##
@@ -35,24 +35,10 @@ public static RaftConfig buildRaftConfig(
 int appendLingerMs,
 List voterNodes
 ) {
-Properties properties = new Properties();
-properties.put(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeoutMs);
-properties.put(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);
-properties.put(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, 
electionTimeoutMs);
-properties.put(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, 
electionBackoffMs);
-properties.put(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, 
fetchTimeoutMs);
-properties.put(RaftConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs);
-
-StringBuilder votersString = new StringBuilder();
-String prefix = "";
-for (Node voter : voterNodes) {
-votersString.append(prefix);
-
votersString.append(voter.id()).append('@').append(voter.host()).append(':').append(voter.port());
-prefix = ",";
-}
-properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
votersString.toString());
-
-return new RaftConfig(properties);
+Map voterConnections = voterNodes.stream()
+.collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port(;
+return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,

Review comment:
   You're right. This is an artifact from the previous constructor usage. 
Fixed.

##

[GitHub] [kafka] rajinisivaram merged pull request #9814: KAFKA-10869: Gate topic IDs behind IBP 2.8

2021-01-21 Thread GitBox


rajinisivaram merged pull request #9814:
URL: https://github.com/apache/kafka/pull/9814


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-21 Thread GitBox


lct45 commented on pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#issuecomment-764806843


   call for review @ableegoldman @cadonna @wcarlson5 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9720: KAFKA-10555: Improve client state machine

2021-01-21 Thread GitBox


mjsax commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-764959796


   Btw: to what extend do we need to update the docs? We should at least add a 
section to `streams/upgrade_guide.html` to mention the change.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


guozhangwang commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r561459260



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -78,15 +89,149 @@ RecordQueue queue() {
 }
 }
 
-PartitionGroup(final Map partitionQueues, 
final Sensor recordLatenessSensor) {
+PartitionGroup(final TaskId id,
+   final Map partitionQueues,
+   final Sensor recordLatenessSensor,
+   final Sensor enforcedProcessingSensor,
+   final long maxTaskIdleMs) {
+this.id = id;
 nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
 this.partitionQueues = partitionQueues;
+this.enforcedProcessingSensor = enforcedProcessingSensor;
+this.maxTaskIdleMs = maxTaskIdleMs;
 this.recordLatenessSensor = recordLatenessSensor;
 totalBuffered = 0;
 allBuffered = false;
 streamTime = RecordQueue.UNKNOWN;
 }
 
+public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {
+final Long lag = metadata.lag();
+if (lag != null) {
+LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag);
+fetchedLags.put(partition, lag);
+}
+}
+
+public boolean readyToProcess(final long wallClockTime) {
+if (LOG.isTraceEnabled()) {
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+LOG.trace(
+"[{}] buffered/lag {}: {}/{}",
+id,
+entry.getKey(),
+entry.getValue().size(),
+fetchedLags.get(entry.getKey())
+);
+}
+}
+// Log-level strategy:
+//  TRACE for messages that don't wait for fetches, since these may be 
logged at extremely high frequency
+//  DEBUG when we waited for a fetch and decided to wait some more, as 
configured
+//  DEBUG when we are ready for processing and didn't have to enforce 
processing
+//  INFO  when we enforce processing, since this has to wait for 
fetches AND may result in disorder
+
+if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+final Set bufferedPartitions = new HashSet<>();
+final Set emptyPartitions = new HashSet<>();
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+if (entry.getValue().isEmpty()) {
+emptyPartitions.add(entry.getKey());
+} else {
+bufferedPartitions.add(entry.getKey());
+}
+}
+LOG.trace("[{}] Ready for processing because max.task.idle.ms 
is disabled." +
+  "\n\tThere may be out-of-order processing for 
this task as a result." +
+  "\n\tBuffered partitions: {}" +
+  "\n\tNon-buffered partitions: {}",
+  id,
+  bufferedPartitions,
+  emptyPartitions);
+}
+return true;

Review comment:
   Should we log INFO if we are indeed enforcing processing? I.e. there are 
some empty partitions.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -134,6 +134,8 @@
 @SuppressWarnings("deprecation")
 public class StreamsConfig extends AbstractConfig {
 
+public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

Review comment:
   nit: move this down below to 147?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
##
@@ -73,28 +82,22 @@
 private final byte[] recordKey = intSerializer.serialize(null, 1);
 
 private final Metrics metrics = new Metrics();
+private final Sensor enforcedProcessingSensor = 
metrics.sensor(UUID.randomUUID().toString());
 private final MetricName lastLatenessValue = new 
MetricName("record-lateness-last-value", "", "", mkMap());
 
-private PartitionGroup group;
 
 private static Sensor getValueSensor(final Metrics metrics, final 
MetricName metricName) {
 final Sensor lastRecordedValue = metrics.sensor(metricName.name());
 lastRecordedValue.add(metricName, new Value());
 return lastRecordedValue;
 }
 
-@Before

Review comment:
   Good refactoring!

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -52,15 +58,20 @@
  * (i.e., it increases or stays the same over time).
  */
 public class 

[GitHub] [kafka] wcarlson5 commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary

2021-01-21 Thread GitBox


wcarlson5 commented on pull request #9941:
URL: https://github.com/apache/kafka/pull/9941#issuecomment-764054886


   Overall LGTM. I am not sure about the cache change but the changes to the 
log makes a lot of sense



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-21 Thread GitBox


jolshan commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-764014519


   @dengziming I realized that for this, it really depends on the IBP of the 
controller. (That is, we need UpdateMetadata to send topic IDs to all the 
brokers). So maybe instead of checking IBP it would make sense to check if the 
MetadataCache does not have any topic IDs. What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9948: MINOR: fix record time in shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing

2021-01-21 Thread GitBox


showuon commented on pull request #9948:
URL: https://github.com/apache/kafka/pull/9948#issuecomment-765053527


   LGTM! Thanks for the fix, @wcarlson5 . And all tests passed!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-21 Thread GitBox


rajinisivaram commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-763996884


   @dengziming https://github.com/apache/kafka/pull/9814 has been merged, so 
this needs rebasing and the check for IBP. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9934: MINOR: Drop enable.metadata.quorum config

2021-01-21 Thread GitBox


hachikuji commented on a change in pull request #9934:
URL: https://github.com/apache/kafka/pull/9934#discussion_r562114271



##
File path: raft/README.md
##
@@ -12,17 +12,14 @@ Below we describe the details to set this up.
 bin/test-raft-server-start.sh config/raft.properties
 
 ### Run Multi Node Quorum ###
-Create 3 separate raft quorum properties as the following

Review comment:
   Yeah, you're right. That's pretty annoying. Let me see if I can do 
anything about it.

##
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##
@@ -116,8 +116,8 @@
 /** indicates the minimum required inter broker magic required to support 
the API */
 public final byte minRequiredInterBrokerMagic;
 
-/** indicates whether the API is enabled and should be exposed in 
ApiVersions **/
-public final boolean isEnabled;
+/** indicates whether this is an API which is only exposed by the KIP-500 
controller **/
+public final boolean isControllerOnlyApi;

Review comment:
   Filed this JIRA: https://issues.apache.org/jira/browse/KAFKA-12232.

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1876,5 +1874,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs 
should always be less than" +
 s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to 
prevent failed" +
 s" authentication responses from timing out")
+
+if (requiresZookeeper && zkConnect == null) {

Review comment:
   Ack. Will add a test case.

##
File path: core/src/main/scala/kafka/Kafka.scala
##
@@ -65,11 +65,12 @@ object Kafka extends Logging {
 
   private def buildServer(props: Properties): Server = {
 val config = KafkaConfig.fromProps(props, false)
-if (config.processRoles.isEmpty) {
+if (config.requiresZookeeper) {

Review comment:
   Hmm `requiresZookeeper` seemed more explicit. Using `processRoles` 
seemed a little more obscure.

##
File path: core/src/main/scala/kafka/Kafka.scala
##
@@ -65,11 +65,12 @@ object Kafka extends Logging {
 
   private def buildServer(props: Properties): Server = {
 val config = KafkaConfig.fromProps(props, false)
-if (config.processRoles.isEmpty) {
+if (config.requiresZookeeper) {

Review comment:
   Hmm `requiresZookeeper` seemed more explicit. I thought using 
`processRoles` was a little obscure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-21 Thread GitBox


dengziming commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-765133359


   Hello, @rajinisivaram , I resolved the comments/questions and fixed the 
NullPointerExceptions, call for a third review. 
   1. Add topicId to`TopicListing ` and add `TopicCommand.getTopicIds`
   2. move the code shared between describeTopics and describeTopicsWithIds to 
a common private method `getTopicDescriptionFromCluster`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >