[GitHub] [kafka] dajac commented on a change in pull request #9903: KAFKA-12204; Implement DescribeCluster API in the broker
dajac commented on a change in pull request #9903: URL: https://github.com/apache/kafka/pull/9903#discussion_r562449399 ## File path: clients/src/main/resources/common/message/DescribeClusterResponse.json ## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 60, + "type": "response", + "name": "DescribeClusterResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, +{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, +{ "name": "ClusterId", "type": "string", "nullableVersions": "0+", "versions": "0+", "default": "null", Review comment: `ClusterId` is always filled in the `DescribeClusterResponse` so it does not have to be `nullable`. In the `MetadataResponse` case, `ClusterId` is not supported by all the versions so having `null` as default helps to differentiate the cases where it is provided from the cases where it is not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9925: MINOR: Tag `RaftEventSimulationTest` as `integration` and tweak it
chia7712 commented on a change in pull request #9925: URL: https://github.com/apache/kafka/pull/9925#discussion_r562445257 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -59,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; Review comment: Is ```assumeTrue``` used in this test? I test ```RaftEventSimulationTest`` but there is no ignored test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] inponomarev commented on pull request #9946: KAFKA-12230: do not interrupt execution on failure to `setPosixFilePermissions` on Windows machines
inponomarev commented on pull request #9946: URL: https://github.com/apache/kafka/pull/9946#issuecomment-765170729 Duplicate of #9947 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] inponomarev closed pull request #9946: KAFKA-12230: do not interrupt execution on failure to `setPosixFilePermissions` on Windows machines
inponomarev closed pull request #9946: URL: https://github.com/apache/kafka/pull/9946 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
vvcephei commented on pull request #9836: URL: https://github.com/apache/kafka/pull/9836#issuecomment-764973767 Huh, I can't get the PlaintextConsumerTest to fail locally... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9934: MINOR: Drop enable.metadata.quorum config
abbccdda commented on a change in pull request #9934: URL: https://github.com/apache/kafka/pull/9934#discussion_r562048902 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ## @@ -116,8 +116,8 @@ /** indicates the minimum required inter broker magic required to support the API */ public final byte minRequiredInterBrokerMagic; -/** indicates whether the API is enabled and should be exposed in ApiVersions **/ -public final boolean isEnabled; +/** indicates whether this is an API which is only exposed by the KIP-500 controller **/ +public final boolean isControllerOnlyApi; Review comment: Could we get a JIRA to track the work for refactoring these boolean flags into static collections of api keys? To me it is not easy to use constructor correctly when there are multiple of them. ## File path: raft/README.md ## @@ -12,17 +12,14 @@ Below we describe the details to set this up. bin/test-raft-server-start.sh config/raft.properties ### Run Multi Node Quorum ### -Create 3 separate raft quorum properties as the following Review comment: Do we need to define `process.roles` here? ## File path: core/src/main/scala/kafka/Kafka.scala ## @@ -65,11 +65,12 @@ object Kafka extends Logging { private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) -if (config.processRoles.isEmpty) { +if (config.requiresZookeeper) { Review comment: What if we call `processRoles.isDefined` here? ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1876,5 +1874,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" + s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + s" authentication responses from timing out") + +if (requiresZookeeper && zkConnect == null) { Review comment: Do we have test coverage for this check? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9945: KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields
chia7712 commented on a change in pull request #9945: URL: https://github.com/apache/kafka/pull/9945#discussion_r561947369 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel, } var clusterAuthorizedOperations = Int.MinValue Review comment: the default value of ```clusterAuthorizedOperations``` in auto-generated protocol is ```-2147483648```. If the version of request is bigger than 10, does it cause error if we set ```Int.MinValue``` to ```clusterAuthorizedOperations``` of ```MetadataResponseData```? ## File path: clients/src/main/resources/common/message/MetadataRequest.json ## @@ -31,7 +31,10 @@ // Starting in version 8, authorized operations can be requested for cluster and topic resource. // // Version 9 is the first flexible version. -// Version 10 add topicId +// +// Version 10 adds topicId. +// +// Version 11 deprecates IncludeClusterAuthorizedOperations field (KIP-700). Review comment: How about saying this function is migrated to ```DescribeClusterRequest```? ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel, } var clusterAuthorizedOperations = Int.MinValue Review comment: Could you add the comments for that value (```Int.MinValue```) for dumb readers like me :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
guozhangwang commented on pull request #9671: URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387 > Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it Hmm... I'm not familiar with SaslXConsumerTest either... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest
dajac commented on pull request #9938: URL: https://github.com/apache/kafka/pull/9938#issuecomment-764468637 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9939: MINOR: fix @link tag in javadoc
chia7712 commented on pull request #9939: URL: https://github.com/apache/kafka/pull/9939#issuecomment-764247507 @tang7526 thanks for your patch. Could you fix other docs error also? ``` /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:42: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24: warning: Could not find any member to link for "org.apache.kafka.streams.kstream.Joined". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala:24: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:37: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala:24: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:68: warning: Could not find any member to link for "Topology.AutoOffsetReset". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:55: warning: Could not find any member to link for "TimestampExtractor". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala:45: warning: Could not find any member to link for "Serde". /** ^ /home/chia7712/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala:157: warning: Could not find any member to link for "org.apache.kafka.streams.errors.TopologyException". /** ^ 44 warnings ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
mjsax commented on pull request #9487: URL: https://github.com/apache/kafka/pull/9487#issuecomment-764961250 @wcarlson5 Seems we forgot the update the docs for this KIP. Can you do a follow up PR? This follow up PR should also cover the changes of https://github.com/apache/kafka/pull/9487 that is part if the same KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9947: KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems
showuon commented on a change in pull request #9947: URL: https://github.com/apache/kafka/pull/9947#discussion_r562312538 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ## @@ -115,22 +115,29 @@ public void shouldCreateBaseDirectory() { @Test public void shouldHaveSecurePermissions() { -final Set expectedPermissions = EnumSet.of( -PosixFilePermission.OWNER_EXECUTE, -PosixFilePermission.GROUP_READ, -PosixFilePermission.OWNER_WRITE, -PosixFilePermission.GROUP_EXECUTE, -PosixFilePermission.OWNER_READ); - -final Path statePath = Paths.get(stateDir.getPath()); -final Path basePath = Paths.get(appDir.getPath()); -try { -final Set baseFilePermissions = Files.getPosixFilePermissions(statePath); -final Set appFilePermissions = Files.getPosixFilePermissions(basePath); -assertThat(expectedPermissions, equalTo(baseFilePermissions)); -assertThat(expectedPermissions, equalTo(appFilePermissions)); -} catch (final IOException e) { -fail("Should create correct files and set correct permissions"); +assertPermissions(Paths.get(stateDir.getPath())); +assertPermissions(Paths.get(appDir.getPath())); +} + +private void assertPermissions(final Path path) { +if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) { +final Set expectedPermissions = EnumSet.of( +PosixFilePermission.OWNER_EXECUTE, +PosixFilePermission.GROUP_READ, +PosixFilePermission.OWNER_WRITE, +PosixFilePermission.GROUP_EXECUTE, +PosixFilePermission.OWNER_READ); +try { +final Set baseFilePermissions = Files.getPosixFilePermissions(path); Review comment: This path could be `baseFile` and also `appFile`. I think we'd better to rename it, ex: `filePermissions` or any better idea? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -109,16 +109,27 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" + " due to the fact that this directory can be cleared by the OS"); } - // change the dir permission to "rwxr-x---" to avoid world readable -final Path basePath = Paths.get(baseDir.getPath()); -final Path statePath = Paths.get(stateDir.getPath()); +configurePermissions(Paths.get(baseDir.getPath())); +configurePermissions(Paths.get(stateDir.getPath())); +} +} + +private void configurePermissions(final Path path) { +if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) { final Set perms = PosixFilePermissions.fromString("rwxr-x---"); try { -Files.setPosixFilePermissions(basePath, perms); -Files.setPosixFilePermissions(statePath, perms); +Files.setPosixFilePermissions(path, perms); } catch (final IOException e) { -log.error("Error changing permissions for the state or base directory {} ", stateDir.getPath(), e); +log.error("Error changing permissions for the directory {} ", path, e); +} +} else { +final File file = path.toFile(); Review comment: We pass the `path` parameter which is converted from `File` instance (i.e. `Paths.get(stateDir.getPath()`), and then we converted it back to `File` instance in the method. It's redundant. Could we just pass the `File` instance into the method directly? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ## @@ -115,22 +115,29 @@ public void shouldCreateBaseDirectory() { @Test public void shouldHaveSecurePermissions() { -final Set expectedPermissions = EnumSet.of( -PosixFilePermission.OWNER_EXECUTE, -PosixFilePermission.GROUP_READ, -PosixFilePermission.OWNER_WRITE, -PosixFilePermission.GROUP_EXECUTE, -PosixFilePermission.OWNER_READ); - -final Path statePath = Paths.get(stateDir.getPath()); -final Path basePath = Paths.get(appDir.getPath()); -try { -final Set baseFilePermissions = Files.getPosixFilePermissions(statePath); -final Set appFilePermissions = Files.getPosixFilePermissions(basePath); -assertThat(expectedPermissions, equalTo(baseFilePermissions)); -assertThat(expectedPermissions, equalTo(appFilePermissions)); -} catch
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
wcarlson5 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562150019 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -235,25 +235,27 @@ public void shouldReduceWindowed() throws Exception { .thenComparing(KeyValueTimestamp::value); windowedOutput.sort(comparator); -final long firstBatchWindow = firstBatchTimestamp / 500 * 500; -final long secondBatchWindow = secondBatchTimestamp / 500 * 500; +final long firstBatchWindowStart = firstBatchTimestamp / 500 * 500; +final long firstBatchWindowEnd = firstBatchWindowStart + 500; Review comment: Why don't you use `timeDifference` like you did below? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ## @@ -33,22 +33,22 @@ */ public class TimeWindowedDeserializer implements Deserializer> { -private final Long windowSize; +private Long windowSize; private boolean isChangelogTopic; private Deserializer inner; - + // Default constructor needed by Kafka public TimeWindowedDeserializer() { -this(null, Long.MAX_VALUE); +this(null, null); } -// TODO: fix this part as last bits of KAFKA-4468 +@Deprecated public TimeWindowedDeserializer(final Deserializer inner) { this(inner, Long.MAX_VALUE); Review comment: Do you want to change this from `Long.MAX_VALUE` as well? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -327,7 +329,7 @@ public void shouldAggregateWindowed() throws Exception { startStreams(); final List, Integer>> windowedMessages = receiveMessagesWithTimestamp( -new TimeWindowedDeserializer<>(), Review comment: I don't think we should get rid of the generics unless we have to ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java ## @@ -56,4 +57,23 @@ public void testWindowedValueDeserializerNoArgConstructors() { assertNotNull("Inner deserializer should be not null", inner); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner instanceof ByteArrayDeserializer); } + +@Test +public void setWindowSizeThroughConfigs() { +props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500"); +final TimeWindowedDeserializer deserializer = new TimeWindowedDeserializer<>(); +deserializer.configure(props, false); Review comment: Is there some sort of check you can verify here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches
dajac commented on pull request #9832: URL: https://github.com/apache/kafka/pull/9832#issuecomment-764469562 Failed tests in the last build are not related to the changes in this PR: * Build / JDK 8 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication() * Build / JDK 8 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication() * Build / JDK 8 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete() * Build / JDK 11 / kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback() I will go ahead and merge the PR in trunk and 2.7. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #9810: MINOR: Tweak IBM i platform support in "stop" scripts
mimaison merged pull request #9810: URL: https://github.com/apache/kafka/pull/9810 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot
dengziming commented on a change in pull request #9819: URL: https://github.com/apache/kafka/pull/9819#discussion_r561742346 ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1093,9 +1093,11 @@ public static final void readFully(InputStream inputStream, ByteBuffer destinati destinationBuffer.position(destinationBuffer.position() + totalBytesRead); } -public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { +public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { +int size = 0; Review comment: In fact, this change was brought when using `BaseRegion`. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( if (!snapshotOpt.isPresent()) { return FetchSnapshotResponse.singleton( log.topicPartition(), -responsePartitionSnapshot -> { -return addQuorumLeader(responsePartitionSnapshot) -.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()); -} +responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) +.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()) ); } try (RawSnapshotReader snapshot = snapshotOpt.get()) { if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshot.sizeInBytes()) { return FetchSnapshotResponse.singleton( log.topicPartition(), -responsePartitionSnapshot -> { -return addQuorumLeader(responsePartitionSnapshot) -.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()); -} +responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) +.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()) ); } int maxSnapshotSize; +int maxSnapshotPosition; try { maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes()); } catch (ArithmeticException e) { maxSnapshotSize = Integer.MAX_VALUE; } -ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), maxSnapshotSize)); -snapshot.read(buffer, partitionSnapshot.position()); -buffer.flip(); +try { +maxSnapshotPosition = Math.toIntExact(partitionSnapshot.position()); +} catch (ArithmeticException e) { +maxSnapshotPosition = Integer.MAX_VALUE; Review comment: Thank you, done! ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException { } @Override -public void append(ByteBuffer buffer) throws IOException { +public void append(BaseRecords records) throws IOException { if (frozen) { throw new IllegalStateException( -String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) ); } - +ByteBuffer buffer; +if (records instanceof MemoryRecords) { +buffer = ((MemoryRecords) records).buffer(); +} else { +buffer = ByteBuffer.allocate(records.sizeInBytes()); +((FileRecords) records).channel().read(buffer); +buffer.flip(); +} Review comment: Done, make parameter type to MemoryRecords. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
guozhangwang commented on a change in pull request #9836: URL: https://github.com/apache/kafka/pull/9836#discussion_r561449314 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2041,6 +2052,118 @@ public void testInvalidGroupMetadata() throws InterruptedException { assertThrows(IllegalStateException.class, consumer::groupMetadata); } +@Test +public void testPollMetadata() { +final Time time = new MockTime(); +final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); +final ConsumerMetadata metadata = createMetadata(subscription); +final MockClient client = new MockClient(time, metadata); + +initMetadata(client, singletonMap(topic, 1)); +final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + +final KafkaConsumer consumer = +newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + +consumer.assign(singleton(tp0)); +consumer.seek(tp0, 50L); + +final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); +client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo))); + +final ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); +assertEquals(5, records.count()); +assertEquals(55L, consumer.position(tp0)); + +// verify that the consumer computes the correct metadata based on the fetch response +final ConsumerRecords.Metadata actualMetadata = records.metadata().get(tp0); +assertEquals(100L, (long) actualMetadata.endOffset()); +assertEquals(55L, (long) actualMetadata.position()); +assertEquals(45L, (long) actualMetadata.lag()); +consumer.close(Duration.ZERO); +} + + +@Test +public void testPollMetadataWithExtraPartitions() { Review comment: Does the test cover 1) stale epoch, 2) no prev value, cases? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp } else { List> records = fetchRecords(nextInLineFetch, recordsRemaining); -if (!records.isEmpty()) { -TopicPartition partition = nextInLineFetch.partition; -List> currentRecords = fetched.get(partition); -if (currentRecords == null) { -fetched.put(partition, records); -} else { -// this case shouldn't usually happen because we only send one fetch at a time per partition, -// but it might conceivably happen in some rare cases (such as partition leader changes). -// we have to copy to a new list because the old one may be immutable -List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); -newRecords.addAll(currentRecords); -newRecords.addAll(records); -fetched.put(partition, newRecords); +TopicPartition partition = nextInLineFetch.partition; + +if (subscriptions.isAssigned(partition)) { +// initializeCompletedFetch, above, has already persisted the metadata from the fetch in the +// SubscriptionState, so we can just read it out, which in particular lets us re-use the logic +// for determining the end offset +final long receivedTimestamp = nextInLineFetch.receivedTimestamp; +final Long beginningOffset = subscriptions.logStartOffset(partition); +final Long endOffset = subscriptions.logEndOffset(partition, isolationLevel); +final FetchPosition fetchPosition = subscriptions.position(partition); + +final FetchedRecords.FetchMetadata fetchMetadata = fetched.metadata().get(partition); +if (fetchMetadata == null +|| !fetchMetadata.position().offsetEpoch.isPresent() +|| fetchPosition.offsetEpoch.isPresent() +&& fetchMetadata.position().offsetEpoch.get() <= fetchPosition.offsetEpoch.get()) { Review comment: Interesting, why we do not want to update the metadata if epoch is stale? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp } else { List> records = fetchRecords(nextInLineFetch, recordsRemaining); -
[GitHub] [kafka] hachikuji merged pull request #9934: MINOR: Drop enable.metadata.quorum config
hachikuji merged pull request #9934: URL: https://github.com/apache/kafka/pull/9934 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +node: Node, +requestThrottleMs: Int, +errorMessage: Option[String] = None): FindCoordinatorResponse = { +new FindCoordinatorResponse( + new FindCoordinatorResponseData() +.setErrorCode(error.code) +.setErrorMessage(errorMessage.getOrElse(error.message)) +.setNodeId(node.id) +.setHost(node.host) +.setPort(node.port) +.setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { -def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() -.setErrorCode(error.code) -.setErrorMessage(error.message) -.setNodeId(node.id) -.setHost(node.host) -.setPort(node.port) -.setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { +if (hasEnoughAliveBrokers(internalTopicName)) { + if (shouldForwardRequest(request)) { +forwardingManager.sendInterBrokerRequest( + getCreateTopicsRequest(Seq(internalTopicName)), + _ => ()) + } else { +val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) + +val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName)) +adminManager.createTopics( + config.requestTimeoutMs, + validateOnly = false, + topicConfigs, + Map.empty, + controllerMutationQuota, + _ => ()) + } } -val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) { - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) -} else { - val coordinatorEndpoint = topicMetadata.partitions.asScala -.find(_.partitionIndex == partition) -.filter(_.leaderId != MetadataResponse.NO_LEADER_ID) -.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) -.flatMap(_.getNode(request.context.listenerName)) -.filterNot(_.isEmpty) - - coordinatorEndpoint match { -case Some(endpoint) => - createFindCoordinatorResponse(Errors.NONE, endpoint) -case _ => - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse( + Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)) + } else { +def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) { +
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
wcarlson5 commented on a change in pull request #9941: URL: https://github.com/apache/kafka/pull/9941#discussion_r561409907 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -407,7 +413,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, referenceContainer.nextScheduledRebalanceMs, shutdownErrorHook, streamsUncaughtExceptionHandler, -cacheSize -> cache.resize(cacheSize) Review comment: My ide tried to optimize this as well. At the time not passing in cacheSize caused some expections. I would be careful about making this change without need ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -407,7 +413,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, referenceContainer.nextScheduledRebalanceMs, shutdownErrorHook, streamsUncaughtExceptionHandler, -cacheSize -> cache.resize(cacheSize) Review comment: I think it caused a test to fail but not everytime. It also could have been fixed since then as changes have been made. If all the tests pass it's probably fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hongshaoyang edited a comment on pull request #9943: MINOR: Fix typo in Utils.java
hongshaoyang edited a comment on pull request #9943: URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143 Ping @guozhangwang Original typo was added in https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions
[ https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269923#comment-17269923 ] Ivan Ponomarev commented on KAFKA-12190: Hi [~mjsax] sure! I searched for the issue on Jira, but didn't find it, that's why the duplicated Jira ticket and the PR. Looking forward for the [~awilkinson] 's fix! > Failure on Windows due to an UnsupportedOperationException when > StateDirectory sets file permissions > > > Key: KAFKA-12190 > URL: https://issues.apache.org/jira/browse/KAFKA-12190 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1, 2.7.1 >Reporter: Andy Wilkinson >Assignee: Andy Wilkinson >Priority: Critical > Labels: bug > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > There appears to be a regression in Kafka 2.6.1 due to [the > changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that > causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're > seeing failures in Spring Boot's CI on Windows such as the following: > {noformat} > Caused by: java.lang.UnsupportedOperationException: (No message provided) > at java.nio.file.Files.setPosixFilePermissions(Files.java:2044) > at > org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115) > > at > org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745) > at > org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585) > at > org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316) > > at > org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) > > at > org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) > > at > org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) > > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) > > at > org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) > > at > org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940) > > at > org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423) > > at > org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61) > > at > org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48) > > at > org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382) > > at > org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392) > > at > org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381) > > at > org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfigurationTests.whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff(KafkaMetricsAutoConfigurationTests.java:92) > {noformat} > The same code worked without changes using Kafka 2.6.0.
[GitHub] [kafka] dajac commented on pull request #9945: KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields
dajac commented on pull request #9945: URL: https://github.com/apache/kafka/pull/9945#issuecomment-764726538 @chia7712 Thanks for your comments. I have updated the PR to address them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest
chia7712 commented on a change in pull request #9938: URL: https://github.com/apache/kafka/pull/9938#discussion_r561689904 ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -73,11 +114,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS TestUtils.waitUntilBrokerMetadataIsPropagated(servers) } - private def accessControlEntry(userName: String, permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = { -new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName).toString, - AclEntry.WildcardHost, operation, permissionType) - } - @AfterEach override def tearDown(): Unit = { if (client != null) Review comment: Sorry for unclear comment. My point was ```Utils.closeQuietly``` includes null check so we don't need to add same check again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6223) Please delete old releases from mirroring system
[ https://issues.apache.org/jira/browse/KAFKA-6223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269917#comment-17269917 ] ASF GitHub Bot commented on KAFKA-6223: --- mimaison commented on pull request #322: URL: https://github.com/apache/kafka-site/pull/322#issuecomment-764846591 @mjsax can you take a look? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Please delete old releases from mirroring system > > > Key: KAFKA-6223 > URL: https://issues.apache.org/jira/browse/KAFKA-6223 > Project: Kafka > Issue Type: Bug > Environment: https://dist.apache.org/repos/dist/release/kafka/ >Reporter: Sebb >Assignee: Rajini Sivaram >Priority: Major > > To reduce the load on the ASF mirrors, projects are required to delete old > releases [1] > Please can you remove all non-current releases? > It's unfair to expect the 3rd party mirrors to carry old releases. > Note that older releases can still be linked from the download page, but such > links should use the archive server at: > https://archive.apache.org/dist/kafka/ > A suggested process is: > + Change the download page to use archive.a.o for old releases > + Delete the corresponding directories from > {{https://dist.apache.org/repos/dist/release/kafka/}} > e.g. {{svn delete https://dist.apache.org/repos/dist/release/kafka/0.8.0}} > Thanks! > [1] http://www.apache.org/dev/release.html#when-to-archive -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] g1geordie commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
g1geordie commented on pull request #9906: URL: https://github.com/apache/kafka/pull/9906#issuecomment-764776835 @chia7712 sorry for late reply . I revert to` NotZstd` format . Can you help me take a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
ableegoldman commented on pull request #9941: URL: https://github.com/apache/kafka/pull/9941#issuecomment-764036403 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax merged pull request #9940: URL: https://github.com/apache/kafka/pull/9940 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] brianwyka removed a comment on pull request #9584: [KAFKA-10708]: Add "group-id" Tag to Kafka Consumer Metrics
brianwyka removed a comment on pull request #9584: URL: https://github.com/apache/kafka/pull/9584#issuecomment-725016649 I couldn't find any tests that are currently testing out the `client-id` tag for reference. Would appreciate a point in the right direction. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling
vvcephei commented on a change in pull request #9840: URL: https://github.com/apache/kafka/pull/9840#discussion_r562089223 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -52,15 +58,20 @@ * (i.e., it increases or stays the same over time). */ public class PartitionGroup { +private static final Logger LOG = LoggerFactory.getLogger(PartitionGroup.class); Review comment: I can pass in the log context. I wouldn't pass the actual logger, though, because it would mess up common log4j usage patterns. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -78,15 +89,149 @@ RecordQueue queue() { } } -PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) { +PartitionGroup(final TaskId id, + final Map partitionQueues, + final Sensor recordLatenessSensor, + final Sensor enforcedProcessingSensor, + final long maxTaskIdleMs) { +this.id = id; nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; +this.enforcedProcessingSensor = enforcedProcessingSensor; +this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; } +public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { +final Long lag = metadata.lag(); +if (lag != null) { +LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag); +fetchedLags.put(partition, lag); +} +} + +public boolean readyToProcess(final long wallClockTime) { +if (LOG.isTraceEnabled()) { +for (final Map.Entry entry : partitionQueues.entrySet()) { +LOG.trace( +"[{}] buffered/lag {}: {}/{}", +id, +entry.getKey(), +entry.getValue().size(), +fetchedLags.get(entry.getKey()) +); +} +} +// Log-level strategy: +// TRACE for messages that don't wait for fetches, since these may be logged at extremely high frequency +// DEBUG when we waited for a fetch and decided to wait some more, as configured +// DEBUG when we are ready for processing and didn't have to enforce processing +// INFO when we enforce processing, since this has to wait for fetches AND may result in disorder + +if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { +if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) { +final Set bufferedPartitions = new HashSet<>(); +final Set emptyPartitions = new HashSet<>(); +for (final Map.Entry entry : partitionQueues.entrySet()) { +if (entry.getValue().isEmpty()) { +emptyPartitions.add(entry.getKey()); +} else { +bufferedPartitions.add(entry.getKey()); +} +} +LOG.trace("[{}] Ready for processing because max.task.idle.ms is disabled." + + "\n\tThere may be out-of-order processing for this task as a result." + + "\n\tBuffered partitions: {}" + + "\n\tNon-buffered partitions: {}", + id, + bufferedPartitions, + emptyPartitions); +} +return true; Review comment: Glad we agree ;) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -910,6 +900,11 @@ public void addRecords(final TopicPartition partition, final Iterable
[GitHub] [kafka] dajac commented on a change in pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest
dajac commented on a change in pull request #9938: URL: https://github.com/apache/kafka/pull/9938#discussion_r561680234 ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.jdk.CollectionConverters._ +object DescribeAuthorizedOperationsTest { + val Group1 = "group1" + val Group2 = "group2" + val Group3 = "group3" + val Topic1 = "topic1" + val Topic2 = "topic2" + + val Group1Acl = new AclBinding( +new ResourcePattern(ResourceType.GROUP, Group1, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL, ALLOW)) + + val Group2Acl = new AclBinding( +new ResourcePattern(ResourceType.GROUP, Group2, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DESCRIBE, ALLOW)) + + val Group3Acl = new AclBinding( +new ResourcePattern(ResourceType.GROUP, Group3, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE, ALLOW)) + + val ClusterAllAcl = new AclBinding( +new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL, ALLOW)) + + val Topic1Acl = new AclBinding( +new ResourcePattern(ResourceType.TOPIC, Topic1, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, ALL, ALLOW)) + + val Topic2All = new AclBinding( +new ResourcePattern(ResourceType.TOPIC, Topic2, PatternType.LITERAL), +accessControlEntry(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, DELETE, ALLOW)) + + private def accessControlEntry( +userName: String, +operation: AclOperation, +permissionType: AclPermissionType Review comment: Indeed, let me remove that field. ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -32,17 +32,55 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.jdk.CollectionConverters._ +object DescribeAuthorizedOperationsTest { + val Group1 = "group1" Review comment: In Scala, we tend to only capitalize the first letter of constants. At least, this how I have been doing it so far. I prefer to keep it as is. ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -73,11 +114,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS TestUtils.waitUntilBrokerMetadataIsPropagated(servers) } - private def accessControlEntry(userName: String, permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = { -new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName).toString, - AclEntry.WildcardHost, operation, permissionType) - } - @AfterEach override def tearDown(): Unit = { if (client != null) Review comment: I think that it is better to keep it as `client` could not be initialized. For instance, let's say that there is an exception raised when the client is constructed. In this case, `client` would remain `null`. ## File path: core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala ## @@ -73,11 +114,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS TestUtils.waitUntilBrokerMetadataIsPropagated(servers) } - private def accessControlEntry(userName: String, permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = { -new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName).toString, - AclEntry.WildcardHost, operation, permissionType) - } - @AfterEach override def tearDown(): Unit = { if (client != null) Review comment: Got it, thanks for the clarification. I have removed the null check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
g1geordie commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r562087677 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ## @@ -423,30 +455,32 @@ public void buildUsingCreateTime(Args args) { } @ParameterizedTest -@RecordBuilderSource +@ArgumentsSource(NotZstd.class) public void testAppendedChecksumConsistency(Args args) { Review comment: also test it ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ## @@ -27,112 +27,81 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; -import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.support.AnnotationConsumer; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; -import java.util.function.BiPredicate; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Arrays.asList; -import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE; -import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; -import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1; -import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.apache.kafka.common.utils.Utils.utf8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class MemoryRecordsBuilderTest { private static class Args { final int bufferOffset; -final byte magic; final CompressionType compressionType; -public Args(int bufferOffset, byte magic, CompressionType compressionType) { +public Args(int bufferOffset, CompressionType compressionType) { this.bufferOffset = bufferOffset; -this.magic = magic; this.compressionType = compressionType; } @Override public String toString() { -return "Args{" + -"bufferOffset=" + bufferOffset + -", magic=" + magic + -", compressionType=" + compressionType + -'}'; +return "bufferOffset=" + bufferOffset + +", compressionType=" + compressionType; } } -private static Stream allArguments(BiPredicate accept) { +private static Stream allArguments(Predicate accept) { List values = new ArrayList<>(); for (int bufferOffset : Arrays.asList(0, 15)) -for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) -for (CompressionType compressionType : CompressionType.values()) -if (accept.test(magic, compressionType)) -values.add(Arguments.of(new Args(bufferOffset, magic, compressionType))); +for (CompressionType compressionType : CompressionType.values()) +if (accept.test(compressionType)) +values.add(Arguments.of(new Args(bufferOffset, compressionType))); return values.stream(); } -private static class MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider, AnnotationConsumer { - -private RecordBuilderSource recordSource; -private BiPredicate validCompress = (magic, type) -> type != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2; - +private static class MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) { -Predicate predicate = magic -> recordSource.minMagic() <= magic && magic <= recordSource.maxMagic(); -return allArguments((magic, type) -> (recordSource.haveInvalidCompress() || validCompress.test(magic, type)) && predicate.test(magic)); +return allArguments(type -> true); } +} +private static class NotZstd implements ArgumentsProvider { Review comment: have added ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ## @@ -72,19 +70
[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269901#comment-17269901 ] Mehran commented on KAFKA-7077: --- Any update? > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine
mjsax commented on a change in pull request #9720: URL: https://github.com/apache/kafka/pull/9720#discussion_r562200891 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ## @@ -978,7 +971,7 @@ private void waitForRunning(final List !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING), MAX_WAIT_TIME_MS, -() -> "Client did not startup on time. Observers transitions: " + observed +() -> "Client did not have the expected state transition on time. Observers transitions: " + observed Review comment: Why this change? We do wait for `RUNNING`? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ## @@ -993,6 +986,17 @@ private void waitForStateTransition(final List> observed, +final List> expected) +throws Exception { + +waitForCondition( +() -> observed.containsAll(expected), +MAX_WAIT_TIME_MS, +() -> "Client did not have the expected state transition on time. Observers transitions: " + observed Review comment: Can we add the expected transitions, too? Easier to debug if the test fails. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ## @@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, } @Test -@Deprecated public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { +final long time = System.currentTimeMillis(); Review comment: The PR you liked seems to be unrelated to this test. Still wondering if we should extract this change to a dedicated PR and cherry-pick to older branches? -- Or do we have a good explanation why older branches would not be affected? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ## @@ -993,6 +986,17 @@ private void waitForStateTransition(final List> observed, +final List> expected) Review comment: nit: fix indention This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9942: KAFKA-12229: reset to original class loader after connector stop
showuon commented on pull request #9942: URL: https://github.com/apache/kafka/pull/9942#issuecomment-764189012 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration
lct45 commented on pull request #9708: URL: https://github.com/apache/kafka/pull/9708#issuecomment-764993225 @mjsax Ahh yeah I'll submit a PR, thanks for catching that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9902: KAFKA-12193: Re-resolve IPs after a client disconnects
mimaison commented on a change in pull request #9902: URL: https://github.com/apache/kafka/pull/9902#discussion_r562011024 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -183,6 +186,10 @@ public void disconnected(String id, long now) { connectingNodes.remove(id); } else { resetConnectionSetupTimeout(nodeState); +if (nodeState.state.isConnected()) { +// If a connection had previously been established, re-resolve DNS because the IPs may have changed +nodeState.addresses = Collections.emptyList(); Review comment: Slightly weird that we're updating a "private" field here. Also the comment is a bit misleading. We're not re-resolving DNS here but instead clearing state so if we reconnect later, the client will be forced to re-resolve then. ## File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ## @@ -907,6 +924,140 @@ public void testCorrelationId() { ids.forEach(id -> assertTrue(id < SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID)); } +@Test +public void testReconnectAfterAddressChange() { +AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(); +AtomicInteger initialAddressConns = new AtomicInteger(); +AtomicInteger newAddressConns = new AtomicInteger(); +MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { +InetAddress inetAddress = inetSocketAddress.getAddress(); +if (initialAddresses.contains(inetAddress)) { +initialAddressConns.incrementAndGet(); +} else if (newAddresses.contains(inetAddress)) { +newAddressConns.incrementAndGet(); +} +return (mockHostResolver.getUseNewAddresses() && newAddresses.contains(inetAddress)) || Review comment: I was slightly confused until I realized `getUseNewAddresses()` is a boolean. Maybe `useNewAddresses()` would be a better name? ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -239,24 +241,63 @@ public NetworkClient(Selectable selector, logContext); } -private NetworkClient(MetadataUpdater metadataUpdater, - Metadata metadata, - Selectable selector, - String clientId, - int maxInFlightRequestsPerConnection, - long reconnectBackoffMs, - long reconnectBackoffMax, - int socketSendBuffer, - int socketReceiveBuffer, - int defaultRequestTimeoutMs, - long connectionSetupTimeoutMs, - long connectionSetupTimeoutMaxMs, - ClientDnsLookup clientDnsLookup, - Time time, - boolean discoverBrokerVersions, - ApiVersions apiVersions, - Sensor throttleTimeSensor, - LogContext logContext) { +public NetworkClient(MetadataUpdater metadataUpdater, Review comment: Do we really need this constructor? As far as I can tell, it's only called by the other 3 above. These could directly call the real one below instead of going through this new one. WDYT? ## File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ## @@ -907,6 +924,140 @@ public void testCorrelationId() { ids.forEach(id -> assertTrue(id < SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID)); } +@Test +public void testReconnectAfterAddressChange() { +AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(); +AtomicInteger initialAddressConns = new AtomicInteger(); +AtomicInteger newAddressConns = new AtomicInteger(); +MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { +InetAddress inetAddress = inetSocketAddress.getAddress(); +if (initialAddresses.contains(inetAddress)) { +initialAddressConns.incrementAndGet(); +} else if (newAddresses.contains(inetAddress)) { +newAddressConns.incrementAndGet(); +} +return (mockHostResolver.getUseNewAddresses() && newAddresses.contains(inetAddress)) || + (!mockHostResolver.getUseNewAddresses() && initialAddresses.contains(inetAddress)); +}); +NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, +reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, +defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connection
[GitHub] [kafka] ijuma commented on pull request #9878: KAFKA-6987: Reimplement KafkaFuture using CompletableFuture
ijuma commented on pull request #9878: URL: https://github.com/apache/kafka/pull/9878#issuecomment-764654281 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9720: KAFKA-10555: Improve client state machine
cadonna commented on a change in pull request #9720: URL: https://github.com/apache/kafka/pull/9720#discussion_r561734967 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) { metrics.close(); if (!error) { setState(State.NOT_RUNNING); +} else { +setState(State.ERROR); } }, "kafka-streams-close-thread"); } private boolean close(final long timeoutMs) { +if (state == State.ERROR) { +log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +return false; Review comment: As far as I can see, where the return value is used the javadoc says ``` true if all threads were successfully stopped, false if the timeout was reached. ``` Since all threads were successfully stopped, I would return `true`. We clearly document that `ERROR` is a terminal state, so I do not see why somebody should wait for `NOT_RUNNING` when the client is in `ERROR` and `close()` returns `true`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot
jsancio commented on a change in pull request #9819: URL: https://github.com/apache/kafka/pull/9819#discussion_r561427699 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException { } @Override -public void append(ByteBuffer buffer) throws IOException { +public void append(BaseRecords records) throws IOException { if (frozen) { throw new IllegalStateException( -String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) ); } - +ByteBuffer buffer; +if (records instanceof MemoryRecords) { +buffer = ((MemoryRecords) records).buffer(); +} else { +buffer = ByteBuffer.allocate(records.sizeInBytes()); +((FileRecords) records).channel().read(buffer); +buffer.flip(); +} Review comment: > I change the signature to keep consistent with FileRawSnapshotReader Okay. I think this is something that I struggled with when creating the original APIs. I am okay with "inconsistent" APIs since `RawSnapshot{Reader,Writer}` are internal interfaces to the raft client and are not exposed to the state machine (controller). I think this "inconsistency" will go away when we implement the long term solution. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562056000 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ## @@ -61,6 +61,21 @@ public Long getWindowSize() { @SuppressWarnings("unchecked") @Override public void configure(final Map configs, final boolean isKey) { +//check if the config is set and the window size is already set from the constructor +final Long configWindowSize; +if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof String) { Review comment: The console consumer made this check necessary - if there's a simpler way to do this lmk ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ## @@ -33,22 +33,22 @@ */ public class TimeWindowedDeserializer implements Deserializer> { -private final Long windowSize; +private Long windowSize; private boolean isChangelogTopic; private Deserializer inner; - + // Default constructor needed by Kafka public TimeWindowedDeserializer() { -this(null, Long.MAX_VALUE); +this(null, null); } -// TODO: fix this part as last bits of KAFKA-4468 +@Deprecated public TimeWindowedDeserializer(final Deserializer inner) { this(inner, Long.MAX_VALUE); Review comment: I thought about it but figured since it's deprecated anyway and we want to keep backwards compatibility I would leave it. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax commented on a change in pull request #9940: URL: https://github.com/apache/kafka/pull/9940#discussion_r561429380 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2779,6 +2779,62 @@ public void suspend() { assertThat(task01.state(), is(Task.State.SUSPENDED)); } +@Test +public void shouldConvertActiveTaskToStandbyTask() { +final StreamTask activeTask = mock(StreamTask.class); +expect(activeTask.id()).andReturn(taskId00).anyTimes(); + expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes(); Review comment: Will fix right away -- Jenkins failed anyway and we need to rerun it. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -190,7 +190,7 @@ public void shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() { final TopicPartition newTopicPartition = new TopicPartition("topic2", 1); final Map> assignment = mkMap(mkEntry(taskId01, mkSet(t1p1, newTopicPartition))); -expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andReturn(emptyList()).anyTimes(); +expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(emptyList()); Review comment: @ableegoldman I just update the whole test class... ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2647,8 +2647,7 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() { task01.setCommittableOffsetsAndMetadata(offsetsT01); final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true); -consumer.groupMetadata(); -expectLastCall().andReturn(null).anyTimes(); +expect(consumer.groupMetadata()).andStubReturn(null); Review comment: Another simplification; make it a one-liner. Same below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
dengziming commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-764219186 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration
mjsax commented on pull request #9708: URL: https://github.com/apache/kafka/pull/9708#issuecomment-764957937 @lct45 -- Seems we missed to update the docs, ie, `streams/upgrade_guide.html` for this KIP. Can you do a follow up PR for it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper
hachikuji commented on a change in pull request #9912: URL: https://github.com/apache/kafka/pull/9912#discussion_r561415958 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int, requestQueue.put(request) } - /** Send a response back to the socket server to be sent over the network */ - def sendResponse(response: RequestChannel.Response): Unit = { + def closeConnection( +request: RequestChannel.Request, +errorCounts: java.util.Map[Errors, Integer] + ): Unit = { +// This case is used when the request handler has encountered an error, but the client +// does not expect a response (e.g. when produce request has acks set to 0) +updateErrorMetrics(request.header.apiKey, errorCounts.asScala) +sendResponse(new RequestChannel.CloseConnectionResponse(request)) + } + + def sendResponse( +request: RequestChannel.Request, +response: AbstractResponse, +onComplete: Option[Send => Unit] Review comment: I decided not to do this here. I didn't like replacing `None` with `_ => {}` in uses, and neither did I like making the argument optional. The alternative is to introduce a constant "no-op" function, but I found this also a little awkward. If you think of a nice way to do it, I can review. I do think it is better having a simpler type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager
mjsax commented on a change in pull request #9835: URL: https://github.com/apache/kafka/pull/9835#discussion_r561342858 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +class Tasks { +private final Logger log; +private final InternalTopologyBuilder builder; +private final StreamsMetricsImpl streamsMetrics; + +private final Map allTasksPerId = new TreeMap<>(); +private final Map readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId); +private final Collection readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values()); + +// TODO: change type to `StreamTask` +private final Map activeTasksPerId = new TreeMap<>(); +// TODO: change type to `StreamTask` +private final Map activeTasksPerPartition = new HashMap<>(); +// TODO: change type to `StreamTask` +private final Map readOnlyActiveTasksPerId = Collections.unmodifiableMap(activeTasksPerId); +private final Set readOnlyActiveTaskIds = Collections.unmodifiableSet(activeTasksPerId.keySet()); +// TODO: change type to `StreamTask` +private final Collection readOnlyActiveTasks = Collections.unmodifiableCollection(activeTasksPerId.values()); + +// TODO: change type to `StandbyTask` +private final Map standbyTasksPerId = new TreeMap<>(); +// TODO: change type to `StandbyTask` +private final Map readOnlyStandbyTasksPerId = Collections.unmodifiableMap(standbyTasksPerId); +private final Set readOnlyStandbyTaskIds = Collections.unmodifiableSet(standbyTasksPerId.keySet()); + +private final ActiveTaskCreator activeTaskCreator; +private final StandbyTaskCreator standbyTaskCreator; + +private Consumer mainConsumer; + +Tasks(final String logPrefix, + final InternalTopologyBuilder builder, + final StreamsMetricsImpl streamsMetrics, + final ActiveTaskCreator activeTaskCreator, + final StandbyTaskCreator standbyTaskCreator) { + +final LogContext logContext = new LogContext(logPrefix); +log = logContext.logger(getClass()); + +this.builder = builder; +this.streamsMetrics = streamsMetrics; +this.activeTaskCreator = activeTaskCreator; +this.standbyTaskCreator = standbyTaskCreator; +} + +void setMainConsumer(final Consumer mainConsumer) { +this.mainConsumer = mainConsumer; +} + +void createTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate) { +for (final Map.Entry> taskToBeCreated : activeTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (activeTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create an active task that we already own: " + taskId); +} +} + +for (final Map.Entry> taskToBeCreated : standbyTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (standbyTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create a standby task that we already own: " + taskId); +} +} + +// keep this check to simplify testing (ie, no need to mock `activeTaskCreator`) +if (!activeTasksToCreate.isEmpty()) { +// TODO: change type to `StreamTask` +for (final Task activeTask : activeTaskCreator.createTas
[GitHub] [kafka] dajac merged pull request #9938: MINOR: Refactor DescribeAuthorizedOperationsTest
dajac merged pull request #9938: URL: https://github.com/apache/kafka/pull/9938 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9945: KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields
dajac commented on a change in pull request #9945: URL: https://github.com/apache/kafka/pull/9945#discussion_r561957390 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel, } var clusterAuthorizedOperations = Int.MinValue Review comment: `-2147483648` is actually `Int.MinValue`. It would cause an error if we would write something different from the default value for versions < 8 and > 10. ## File path: clients/src/main/resources/common/message/MetadataRequest.json ## @@ -31,7 +31,10 @@ // Starting in version 8, authorized operations can be requested for cluster and topic resource. // // Version 9 is the first flexible version. -// Version 10 add topicId +// +// Version 10 adds topicId. +// +// Version 11 deprecates IncludeClusterAuthorizedOperations field (KIP-700). Review comment: Yeah, why not. I will add it. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel, } var clusterAuthorizedOperations = Int.MinValue Review comment: Will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
mjsax edited a comment on pull request #9487: URL: https://github.com/apache/kafka/pull/9487#issuecomment-764961250 @wcarlson5 Seems we forgot the update the docs for this KIP. Can you do a follow up PR? This follow up PR should also cover the changes of https://issues.apache.org/jira/browse/KAFKA-10810 that is part if the same KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #9948: Minor: fix record time
wcarlson5 commented on pull request #9948: URL: https://github.com/apache/kafka/pull/9948#issuecomment-764984984 @mjsax I extracted the fix This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #9720: KAFKA-10555: Improve client state machine
wcarlson5 commented on pull request #9720: URL: https://github.com/apache/kafka/pull/9720#issuecomment-764986776 I extracted the https://github.com/apache/kafka/pull/9720/files#r562203226 fix to https://github.com/apache/kafka/pull/9948 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] twmb commented on a change in pull request #9903: KAFKA-12204; Implement DescribeCluster API in the broker
twmb commented on a change in pull request #9903: URL: https://github.com/apache/kafka/pull/9903#discussion_r562394942 ## File path: clients/src/main/resources/common/message/DescribeClusterResponse.json ## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 60, + "type": "response", + "name": "DescribeClusterResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, +{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, +{ "name": "ClusterId", "type": "string", "nullableVersions": "0+", "versions": "0+", "default": "null", Review comment: Does it? Does MetadataResponse? Is there a reason Metadata does and this doesn't? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #9832: KAFKA-12152; Idempotent Producer does not reset the sequence number of partitions without in-flight batches
dajac merged pull request #9832: URL: https://github.com/apache/kafka/pull/9832 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
ableegoldman merged pull request #9941: URL: https://github.com/apache/kafka/pull/9941 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
mjsax commented on pull request #9940: URL: https://github.com/apache/kafka/pull/9940#issuecomment-764070908 All three runs failed with different errors: JDK8: ``` kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 37.320395596193315 (600 connections / 16.077 sec) ==> expected: <30.0> but was: <37.320395596193315> ``` JDK11: ``` kafka.admin.FeatureCommandTest.testDescribeFeaturesSuccess() org.opentest4j.AssertionFailedError: expected: but was: ``` JDK15: ``` kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback() org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] metadata not propagated after 15000 ms // and kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback() org.opentest4j.AssertionFailedError: Partition [group1_largeTopic,5] metadata not propagated after 15000 ms ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9819: KAFKA-10694: Implement zero copy for FetchSnapshot
hachikuji commented on a change in pull request #9819: URL: https://github.com/apache/kafka/pull/9819#discussion_r561625246 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException { } @Override -public void append(ByteBuffer buffer) throws IOException { +public void append(BaseRecords records) throws IOException { if (frozen) { throw new IllegalStateException( -String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) ); } - +ByteBuffer buffer; +if (records instanceof MemoryRecords) { +buffer = ((MemoryRecords) records).buffer(); +} else { Review comment: I feel more inclined to raise an exception if we get a `BaseRecords` type that is not `MemoryRecords`. If we really get an unexpected file in here, then we need to reconsider the IO model instead of hiding a big copy. We could even make the expectation explicit in the parameter type even if it is not 100% symmetric with `RawSnapshotReader`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1093,9 +1093,11 @@ public static final void readFully(InputStream inputStream, ByteBuffer destinati destinationBuffer.position(destinationBuffer.position() + totalBytesRead); } -public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { +public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { +int size = 0; Review comment: Hmm.. Not sure we need to compute this. Wouldn't it be the same as `sourceBuffer.remaining()`? ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( if (!snapshotOpt.isPresent()) { return FetchSnapshotResponse.singleton( log.topicPartition(), -responsePartitionSnapshot -> { -return addQuorumLeader(responsePartitionSnapshot) -.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()); -} +responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) +.setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()) ); } try (RawSnapshotReader snapshot = snapshotOpt.get()) { if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshot.sizeInBytes()) { return FetchSnapshotResponse.singleton( log.topicPartition(), -responsePartitionSnapshot -> { -return addQuorumLeader(responsePartitionSnapshot) -.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()); -} +responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) +.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()) ); } int maxSnapshotSize; +int maxSnapshotPosition; try { maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes()); } catch (ArithmeticException e) { maxSnapshotSize = Integer.MAX_VALUE; } -ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), maxSnapshotSize)); -snapshot.read(buffer, partitionSnapshot.position()); -buffer.flip(); +try { +maxSnapshotPosition = Math.toIntExact(partitionSnapshot.position()); +} catch (ArithmeticException e) { +maxSnapshotPosition = Integer.MAX_VALUE; Review comment: I agree we should probably throw this. Snapshot size limits are an interesting point which I hadn't thought about. Currently `FileRecords` does not support files which are larger than Int.MaxValue. That gives us a 2GB limit. My feeling is that is probably good enough initially, but perhaps that adds some fuel for the effort to generalize the zero-copy support. ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java ## @@ -51,7 +51,7 @@ public long sizeInBytes() { } public BaseRecords read(long position, int size) throws IOException { -return fileRecords.slice(Math.toIntExact(position), size); +return fileRecords.slice((int) position, size); Review comment: Using `Math.toIntExact` seemed better. ## File path: raft/src/main/java/org/apache/
[GitHub] [kafka] vvcephei commented on pull request #9107: KAFKA-5488: Add type-safe split() operator
vvcephei commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-764879234 Thanks, @inponomarev ! Yes, we sometimes have to update the KIP as the PR gets finalized. I like to send a quick reply to the vote thread on the mailing list to summarize the design changes, in case anyone wants to object (I don't think I've ever had someone object). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
guozhangwang commented on pull request #9941: URL: https://github.com/apache/kafka/pull/9941#issuecomment-764408620 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9814: KAFKA-10869: Gate topic IDs behind IBP 2.8
rajinisivaram commented on pull request #9814: URL: https://github.com/apache/kafka/pull/9814#issuecomment-763993768 @jolshan Thanks for running the system tests, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
hachikuji commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561283768 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java ## @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.apache.kafka.common.config.ConfigException; -import org.junit.jupiter.api.Test; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class RaftConfigTest { - -@Test -public void testSingleQuorumVoterConnections() { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@127.0.0.1:9092"); -RaftConfig config = new RaftConfig(properties); -assertEquals(Collections.singletonMap(1, new InetSocketAddress("127.0.0.1", 9092)), -config.quorumVoterConnections()); -} - -@Test -public void testMultiQuorumVoterConnections() { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@kafka1:9092,2@kafka2:9092,3@kafka3:9092"); -RaftConfig config = new RaftConfig(properties); - -HashMap expected = new HashMap<>(); -expected.put(1, new InetSocketAddress("kafka1", 9092)); -expected.put(2, new InetSocketAddress("kafka2", 9092)); -expected.put(3, new InetSocketAddress("kafka3", 9092)); - -assertEquals(expected, config.quorumVoterConnections()); Review comment: This test case seems stronger than the one that was ported to `KafkaConfigTest`. It is validating the endpoints in addition to the number of voters. Is there any way we can recover this? ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) { return voterMap; } +public static class ControllerQuorumVotersValidator implements ConfigDef.Validator { +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +throw new ConfigException(name, null); +} + +@SuppressWarnings("unchecked") +List voterStrings = (List) value; + +if (voterStrings.size() == 0) { +// TODO: Add a flag to skip validation for an empty voter string, conditionally. +// For now, skip anyway. See https://github.com/apache/kafka/pull/9916#discussion_r560611932 Review comment: We typically do not leave TODOs in the code. We can file a jira if we think it's important to remember. I'd suggest we just leave this check out and skip the empty check below. ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -941,4 +950,38 @@ class KafkaConfigTest { }) } + @Test + def testInvalidQuorumVotersConfig(): Unit = { +assertInvalidQuorumVoters("1") +assertInvalidQuorumVoters("1@") +assertInvalidQuorumVoters("1:") +assertInvalidQuorumVoters("blah@") +assertInvalidQuorumVoters("1@kafka1") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,2") +assertInvalidQuorumVoters("1@kafka1:9092,2@") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah,") + } + + private def assertInvalidQuorumVoters(value: String): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + } + + @Test + def testValidQuorumVotersConfig(): Unit = { +assertValidQuorumVoters("", 0) +assertValidQuorumVoters("1@127.0.0.1:9092", 1) +assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3) + } + + private def assertValidQuorumVoters(value: String, expecte
[GitHub] [kafka] chia7712 merged pull request #9926: KAFKA-12175 Migrate generator module to junit5
chia7712 merged pull request #9926: URL: https://github.com/apache/kafka/pull/9926 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9946: KAFKA-12230: do not interrupt execution on failure to `setPosixFilePermissions` on Windows machines
mjsax commented on pull request #9946: URL: https://github.com/apache/kafka/pull/9946#issuecomment-764913632 @inponomarev Thanks for the PR. Seems there is already #9947 for this issue that seems to be more sophisticated than just swallowing the exception. I think we should close this PR in favor of #9947? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
chia7712 commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r562029718 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ## @@ -27,112 +27,81 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; -import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.support.AnnotationConsumer; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; -import java.util.function.BiPredicate; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Arrays.asList; -import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE; -import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; -import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1; -import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.apache.kafka.common.utils.Utils.utf8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class MemoryRecordsBuilderTest { private static class Args { final int bufferOffset; -final byte magic; final CompressionType compressionType; -public Args(int bufferOffset, byte magic, CompressionType compressionType) { +public Args(int bufferOffset, CompressionType compressionType) { this.bufferOffset = bufferOffset; -this.magic = magic; this.compressionType = compressionType; } @Override public String toString() { -return "Args{" + -"bufferOffset=" + bufferOffset + -", magic=" + magic + -", compressionType=" + compressionType + -'}'; +return "bufferOffset=" + bufferOffset + +", compressionType=" + compressionType; } } -private static Stream allArguments(BiPredicate accept) { +private static Stream allArguments(Predicate accept) { List values = new ArrayList<>(); for (int bufferOffset : Arrays.asList(0, 15)) -for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) -for (CompressionType compressionType : CompressionType.values()) -if (accept.test(magic, compressionType)) -values.add(Arguments.of(new Args(bufferOffset, magic, compressionType))); +for (CompressionType compressionType : CompressionType.values()) +if (accept.test(compressionType)) +values.add(Arguments.of(new Args(bufferOffset, compressionType))); return values.stream(); } -private static class MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider, AnnotationConsumer { - -private RecordBuilderSource recordSource; -private BiPredicate validCompress = (magic, type) -> type != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2; - +private static class MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) { -Predicate predicate = magic -> recordSource.minMagic() <= magic && magic <= recordSource.maxMagic(); -return allArguments((magic, type) -> (recordSource.haveInvalidCompress() || validCompress.test(magic, type)) && predicate.test(magic)); +return allArguments(type -> true); } +} +private static class NotZstd implements ArgumentsProvider { Review comment: Please add comment for this suite. ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ## @@ -423,30 +455,32 @@ public void buildUsingCreateTime(Args args) { } @ParameterizedTest -@RecordBuilderSource +@ArgumentsSource(NotZstd.class) public void testAppendedChecksumConsistency(Args args) { Review comment: this test case should test zstd + magic_2. We can changes the magic code according to compression type. -
[GitHub] [kafka] chia7712 commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5
chia7712 commented on pull request #9926: URL: https://github.com/apache/kafka/pull/9926#issuecomment-764212202 > What have you seen that implies that? The following error implies that recent fixes are not included. ``` java.lang.AssertionError: Expected all streams instances in [org.apache.kafka.streams.KafkaStreams@2d6a0fff] to be REBALANCING within 3 ms, but the following were not: {org.apache.kafka.streams.KafkaStreams@2d6a0fff=RUNNING} at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:936) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:318) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:919) at org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect(AdjustStreamThreadCountTest.java:229) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ``` https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9926/8/testReport/junit/org.apache.kafka.streams.integration/AdjustStreamThreadCountTest/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #9936: [WIP] reset to default class loader
showuon closed pull request #9936: URL: https://github.com/apache/kafka/pull/9936 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
guozhangwang commented on pull request #9836: URL: https://github.com/apache/kafka/pull/9836#issuecomment-765031712 Made another pass on the latest commit and it LGTM. Also triggered the test again. Once it passed we can merge as-is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
ableegoldman commented on a change in pull request #9941: URL: https://github.com/apache/kafka/pull/9941#discussion_r561394151 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1053,7 +1054,7 @@ int maybeCommitActiveTasksPerUserRequested() { } private void commitOffsetsOrTransaction(final Map> offsetsPerTask) { -log.debug("Committing task offsets {}", offsetsPerTask); +log.debug("Committing task offsets {}", offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects Review comment: An unrelated but equally annoying thing I noticed in the logs: we should never log a full `Task` object because it prints literally everything about the task, including for example the topology description which is not that useful but sometimes VERY long ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -407,7 +413,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, referenceContainer.nextScheduledRebalanceMs, shutdownErrorHook, streamsUncaughtExceptionHandler, -cacheSize -> cache.resize(cacheSize) Review comment: Interesting. It *should* be exactly the same, but of course who knows with Java. Did it cause a test to fail or was it something more subtle? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
hachikuji merged pull request #9916: URL: https://github.com/apache/kafka/pull/9916 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9940: KAFKA-12185: fix ConcurrentModificationException in newly added Tasks container class
ableegoldman commented on a change in pull request #9940: URL: https://github.com/apache/kafka/pull/9940#discussion_r561368977 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2779,6 +2779,62 @@ public void suspend() { assertThat(task01.state(), is(Task.State.SUSPENDED)); } +@Test +public void shouldConvertActiveTaskToStandbyTask() { +final StreamTask activeTask = mock(StreamTask.class); +expect(activeTask.id()).andReturn(taskId00).anyTimes(); + expect(activeTask.inputPartitions()).andReturn(taskId00Partitions).anyTimes(); Review comment: nit: use `andStubReturn` instead of `andReturn().anyTimes`. No need to change this now, don't want to block the fix, just fyi for future PRs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9660: Kafka 10629 - TopologyTestDriver should not require a Properties argument
mjsax commented on pull request #9660: URL: https://github.com/apache/kafka/pull/9660#issuecomment-764958643 @rohitrmd Seems we forgot to update the docs with this PR, in particular `streams/upgarde_guide.html` and maybe also the section about testing? Would you like to do a follow up PR to close this gap? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file
chia7712 merged pull request #9891: URL: https://github.com/apache/kafka/pull/9891 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size
showuon commented on pull request #9777: URL: https://github.com/apache/kafka/pull/9777#issuecomment-764474867 @rajinisivaram @junrao , please help review this PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
rajinisivaram commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r562233235 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java ## @@ -65,6 +65,19 @@ public TopicDescription(String name, boolean internal, List this(name, internal, partitions, Collections.emptySet()); } +/** + * Create an instance with the specified parameters. + * + * @param name The topic name + * @param internal Whether the topic is internal to Kafka + * @param partitions A list of partitions where the index represents the partition id and the element contains + * leadership and replica information for that partition. + * @param topicId the topic id + */ +public TopicDescription(String name, boolean internal, List partitions, Uuid topicId) { Review comment: Do we really need this constructor in the public class? We could just use the one below? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) { return new DescribeTopicsResult(new HashMap<>(topicFutures)); } +@Override +public DescribeTopicsResultWithIds describeTopicsWithIds(Collection topicIds, DescribeTopicsOptions options) { + +final Map> topicFutures = new HashMap<>(topicIds.size()); +final List topicIdsList = new ArrayList<>(); +for (Uuid topicId : topicIds) { +if (topicIdIsUnrepresentable(topicId)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic id '" + +topicId + "' cannot be represented in a request.")); +topicFutures.put(topicId, future); +} else if (!topicFutures.containsKey(topicId)) { +topicFutures.put(topicId, new KafkaFutureImpl<>()); +topicIdsList.add(topicId); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { + +@Override +MetadataRequest.Builder createRequest(int timeoutMs) { +return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) +.setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +MetadataResponse response = (MetadataResponse) abstractResponse; +// Handle server responses for particular topics. +Cluster cluster = response.cluster(); +Map errors = response.errorsByTopicId(); +for (Map.Entry> entry : topicFutures.entrySet()) { +Uuid topicId = entry.getKey(); +KafkaFutureImpl future = entry.getValue(); + +String topicName = cluster.topicName(topicId); +if (topicName == null) { +future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found.")); +continue; +} +Errors topicError = errors.get(topicId); +if (topicError != null) { +future.completeExceptionally(topicError.exception()); +continue; +} + +boolean isInternal = cluster.internalTopics().contains(topicName); +List partitionInfos = cluster.partitionsForTopic(topicName); +List partitions = new ArrayList<>(partitionInfos.size()); +for (PartitionInfo partitionInfo : partitionInfos) { +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( +partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), +Arrays.asList(partitionInfo.inSyncReplicas())); +partitions.add(topicPartitionInfo); +} + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); +TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions, + validAclOperations(response.topicAuthorizedOperations(topicName).get()), topicId); +future.complete(topicDescription); +} +} + +private Node lead
[GitHub] [kafka] feyman2016 commented on a change in pull request #9739: KAFKA-10636 Bypass log validation for writes to raft log
feyman2016 commented on a change in pull request #9739: URL: https://github.com/apache/kafka/pull/9739#discussion_r561587550 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -180,7 +179,7 @@ private void startNewBatch() { nextOffset, time.milliseconds(), false, -RecordBatch.NO_PARTITION_LEADER_EPOCH, +epoch, Review comment: Let me check ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File, leaderEpoch: Int, origin: AppendOrigin = AppendOrigin.Client, interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { -append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false) +val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader +append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false) Review comment: Make sense to me, it is also a little bit odd to me, but I put it here because I think `assignOffsets`==true for `appendAsLeader` and ==false for `appendAsFollower`, which means normally `assignOffsets` is determined by the caller, the `RaftLeader` is just a special case for `appendAsLeader`, if we move the logic in `analyzeAndValidateRecords`, that means it need to determine whether to `assignOffsets` without caller info, does that doable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9934: MINOR: Drop enable.metadata.quorum config
hachikuji commented on pull request #9934: URL: https://github.com/apache/kafka/pull/9934#issuecomment-763923618 I decided to leave the controller-only flag as it is. I think there are further improvements here to make the scope of the API clearer, but the implications for compatibility are subtle enough that we should consider it separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9926: KAFKA-12175 Migrate generator module to junit5
ijuma commented on pull request #9926: URL: https://github.com/apache/kafka/pull/9926#issuecomment-763893680 @chia7712 What have you seen that implies that? This was certainly the case before, not sure if something changed at some point (I know because I've seen compiler errors that would not be possible without the merge). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
ableegoldman commented on pull request #9671: URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539 Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9878: KAFKA-6987: Reimplement KafkaFuture using CompletableFuture
dajac commented on pull request #9878: URL: https://github.com/apache/kafka/pull/9878#issuecomment-764677084 @ijuma I was actually thinking about the same. The new or the old could likely be a thin facade of the other. Another approach would be to duplicate all the methods in the *Result classes but that does not sound good to me. I think that it would be nice if we could eventually remove `KafkaFuture` and use `CompletionStage` instead. That would be nice from a user of the admin api perspective at least. That would also be inline with the producer api KIP-706 lands in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9947: KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems
ableegoldman commented on a change in pull request #9947: URL: https://github.com/apache/kafka/pull/9947#discussion_r562317985 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -109,16 +109,27 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" + " due to the fact that this directory can be cleared by the OS"); } - // change the dir permission to "rwxr-x---" to avoid world readable -final Path basePath = Paths.get(baseDir.getPath()); -final Path statePath = Paths.get(stateDir.getPath()); +configurePermissions(Paths.get(baseDir.getPath())); +configurePermissions(Paths.get(stateDir.getPath())); +} +} + +private void configurePermissions(final Path path) { +if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) { final Set perms = PosixFilePermissions.fromString("rwxr-x---"); try { -Files.setPosixFilePermissions(basePath, perms); -Files.setPosixFilePermissions(statePath, perms); +Files.setPosixFilePermissions(path, perms); } catch (final IOException e) { -log.error("Error changing permissions for the state or base directory {} ", stateDir.getPath(), e); +log.error("Error changing permissions for the directory {} ", path, e); +} +} else { +final File file = path.toFile(); +boolean set = file.setReadable(true, false); +set &= file.setWritable(true, true); Review comment: If you only have `file.setWritable(true, true)` then the directory will still be writeable by non-users, I assume? I actually don't know the details of the `File#setXXX` methods -- but we don't want it to be writeable by just anyone. Should we instead do something like ```suggestion set &= file.setWritable(false) && file.setWritable(true, true); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -109,16 +109,27 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" + " due to the fact that this directory can be cleared by the OS"); } - // change the dir permission to "rwxr-x---" to avoid world readable -final Path basePath = Paths.get(baseDir.getPath()); -final Path statePath = Paths.get(stateDir.getPath()); +configurePermissions(Paths.get(baseDir.getPath())); +configurePermissions(Paths.get(stateDir.getPath())); +} +} + +private void configurePermissions(final Path path) { +if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) { final Set perms = PosixFilePermissions.fromString("rwxr-x---"); try { -Files.setPosixFilePermissions(basePath, perms); -Files.setPosixFilePermissions(statePath, perms); +Files.setPosixFilePermissions(path, perms); } catch (final IOException e) { -log.error("Error changing permissions for the state or base directory {} ", stateDir.getPath(), e); +log.error("Error changing permissions for the directory {} ", path, e); +} +} else { +final File file = path.toFile(); +boolean set = file.setReadable(true, false); Review comment: I think we actually want it to be readable _only_ by the user, and explicitly restrict permissions for all other users. The patch which originally broke things for Windows users was trying to tighten up the security in exactly this way This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine
wcarlson5 commented on a change in pull request #9720: URL: https://github.com/apache/kafka/pull/9720#discussion_r562043444 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) { metrics.close(); if (!error) { setState(State.NOT_RUNNING); +} else { +setState(State.ERROR); } }, "kafka-streams-close-thread"); } private boolean close(final long timeoutMs) { +if (state == State.ERROR) { +log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +return false; Review comment: Alright I adjusted the close response to align with this. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ## @@ -978,7 +971,7 @@ private void waitForRunning(final List !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING), MAX_WAIT_TIME_MS, -() -> "Client did not startup on time. Observers transitions: " + observed +() -> "Client did not have the expected state transition on time. Observers transitions: " + observed Review comment: We do wait for running, I was thinking of bringing it to match with the other methods below but that doesn't make it anymore useful so I will revet it. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ## @@ -993,6 +986,17 @@ private void waitForStateTransition(final List> observed, +final List> expected) +throws Exception { + +waitForCondition( +() -> observed.containsAll(expected), +MAX_WAIT_TIME_MS, +() -> "Client did not have the expected state transition on time. Observers transitions: " + observed Review comment: sure that is fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed
showuon commented on pull request #9791: URL: https://github.com/apache/kafka/pull/9791#issuecomment-764474682 @rhauch @kkonstantine , please help review this simple PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on pull request #6592: URL: https://github.com/apache/kafka/pull/6592#issuecomment-763952940 @mjsax Hey, just rebased my branch with the trunk, updated my tests to use JUnit 5. Let me know when you guys will have time to review it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
dengziming commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r562333723 ## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ## @@ -313,42 +313,57 @@ object TopicCommand extends Logging { } override def describeTopic(opts: TopicCommandOptions): Unit = { - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.topic, !opts.ifExists) + val topicId = opts.topicId.map(Uuid.fromString).filter(_ != Uuid.ZERO_UUID) + // if topicId is provided and not zero, will use topicId regardless of topic name Review comment: that's what I thought at the beginning, now I think it's better to use topicId and print a warning message if both are provided. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) { return new DescribeTopicsResult(new HashMap<>(topicFutures)); } +@Override +public DescribeTopicsResultWithIds describeTopicsWithIds(Collection topicIds, DescribeTopicsOptions options) { + +final Map> topicFutures = new HashMap<>(topicIds.size()); +final List topicIdsList = new ArrayList<>(); +for (Uuid topicId : topicIds) { +if (topicIdIsUnrepresentable(topicId)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic id '" + +topicId + "' cannot be represented in a request.")); +topicFutures.put(topicId, future); +} else if (!topicFutures.containsKey(topicId)) { +topicFutures.put(topicId, new KafkaFutureImpl<>()); +topicIdsList.add(topicId); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { + +@Override +MetadataRequest.Builder createRequest(int timeoutMs) { +return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) +.setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +MetadataResponse response = (MetadataResponse) abstractResponse; +// Handle server responses for particular topics. +Cluster cluster = response.cluster(); +Map errors = response.errorsByTopicId(); +for (Map.Entry> entry : topicFutures.entrySet()) { +Uuid topicId = entry.getKey(); +KafkaFutureImpl future = entry.getValue(); + +String topicName = cluster.topicName(topicId); +if (topicName == null) { +future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found.")); +continue; +} +Errors topicError = errors.get(topicId); +if (topicError != null) { +future.completeExceptionally(topicError.exception()); +continue; +} + +boolean isInternal = cluster.internalTopics().contains(topicName); +List partitionInfos = cluster.partitionsForTopic(topicName); +List partitions = new ArrayList<>(partitionInfos.size()); +for (PartitionInfo partitionInfo : partitionInfos) { +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( +partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), +Arrays.asList(partitionInfo.inSyncReplicas())); +partitions.add(topicPartitionInfo); +} + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); +TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions, + validAclOperations(response.topicAuthorizedOperations(topicName).get()), topicId); +future.complete(topicDescription); Review comment: Thank you, I added a getTopicDescriptionFromCluster to do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the
[GitHub] [kafka] chia7712 commented on pull request #9891: MINOR: Remind user index file is empty when dumping LogSegment index file
chia7712 commented on pull request #9891: URL: https://github.com/apache/kafka/pull/9891#issuecomment-764233284 @17hao Thanks for your patch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9840: KAFKA-10867: Improved task idling
guozhangwang commented on pull request #9840: URL: https://github.com/apache/kafka/pull/9840#issuecomment-765055878 LGTM! Once we have perf numbers quantifying its impact I think we can merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hongshaoyang commented on pull request #9943: MINOR: Fix typo in Utils.java
hongshaoyang commented on pull request #9943: URL: https://github.com/apache/kafka/pull/9943#issuecomment-764209143 Ping @guozhangwang Original typo was added in https://github.com/apache/kafka/commit/3a9f4b833bb7e86dc759361c33f4321ab043db05 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9878: KAFKA-6987: Reimplement KafkaFuture using CompletableFuture
tombentley commented on pull request #9878: URL: https://github.com/apache/kafka/pull/9878#issuecomment-764649903 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9835: MINOR: Add 'task container' class to KafkaStreams TaskManager
ableegoldman commented on a change in pull request #9835: URL: https://github.com/apache/kafka/pull/9835#discussion_r561340839 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +class Tasks { +private final Logger log; +private final InternalTopologyBuilder builder; +private final StreamsMetricsImpl streamsMetrics; + +private final Map allTasksPerId = new TreeMap<>(); +private final Map readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId); +private final Collection readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values()); + +// TODO: change type to `StreamTask` +private final Map activeTasksPerId = new TreeMap<>(); +// TODO: change type to `StreamTask` +private final Map activeTasksPerPartition = new HashMap<>(); +// TODO: change type to `StreamTask` +private final Map readOnlyActiveTasksPerId = Collections.unmodifiableMap(activeTasksPerId); +private final Set readOnlyActiveTaskIds = Collections.unmodifiableSet(activeTasksPerId.keySet()); +// TODO: change type to `StreamTask` +private final Collection readOnlyActiveTasks = Collections.unmodifiableCollection(activeTasksPerId.values()); + +// TODO: change type to `StandbyTask` +private final Map standbyTasksPerId = new TreeMap<>(); +// TODO: change type to `StandbyTask` +private final Map readOnlyStandbyTasksPerId = Collections.unmodifiableMap(standbyTasksPerId); +private final Set readOnlyStandbyTaskIds = Collections.unmodifiableSet(standbyTasksPerId.keySet()); + +private final ActiveTaskCreator activeTaskCreator; +private final StandbyTaskCreator standbyTaskCreator; + +private Consumer mainConsumer; + +Tasks(final String logPrefix, + final InternalTopologyBuilder builder, + final StreamsMetricsImpl streamsMetrics, + final ActiveTaskCreator activeTaskCreator, + final StandbyTaskCreator standbyTaskCreator) { + +final LogContext logContext = new LogContext(logPrefix); +log = logContext.logger(getClass()); + +this.builder = builder; +this.streamsMetrics = streamsMetrics; +this.activeTaskCreator = activeTaskCreator; +this.standbyTaskCreator = standbyTaskCreator; +} + +void setMainConsumer(final Consumer mainConsumer) { +this.mainConsumer = mainConsumer; +} + +void createTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate) { +for (final Map.Entry> taskToBeCreated : activeTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (activeTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create an active task that we already own: " + taskId); +} +} + +for (final Map.Entry> taskToBeCreated : standbyTasksToCreate.entrySet()) { +final TaskId taskId = taskToBeCreated.getKey(); + +if (standbyTasksPerId.containsKey(taskId)) { +throw new IllegalStateException("Attempted to create a standby task that we already own: " + taskId); +} +} + +// keep this check to simplify testing (ie, no need to mock `activeTaskCreator`) +if (!activeTasksToCreate.isEmpty()) { +// TODO: change type to `StreamTask` +for (final Task activeTask : activeTaskCreator.cr
[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
hachikuji commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r561451602 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +node: Node, +requestThrottleMs: Int, +errorMessage: Option[String] = None): FindCoordinatorResponse = { +new FindCoordinatorResponse( + new FindCoordinatorResponseData() +.setErrorCode(error.code) +.setErrorMessage(errorMessage.getOrElse(error.message)) +.setNodeId(node.id) +.setHost(node.host) +.setPort(node.port) +.setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { -def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() -.setErrorCode(error.code) -.setErrorMessage(error.message) -.setNodeId(node.id) -.setHost(node.host) -.setPort(node.port) -.setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { +if (hasEnoughAliveBrokers(internalTopicName)) { Review comment: In the case of forwarding, maybe we can let the controller decide if there are enough alive brokers. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +
[GitHub] [kafka] vvcephei commented on a change in pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
vvcephei commented on a change in pull request #9836: URL: https://github.com/apache/kafka/pull/9836#discussion_r561967937 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ## @@ -218,7 +218,21 @@ public synchronized void unsubscribe() { } toClear.forEach(p -> this.records.remove(p)); -return new ConsumerRecords<>(results); + +final Map metadata = new HashMap<>(); +for (final TopicPartition partition : subscriptions.assignedPartitions()) { +if (subscriptions.hasValidPosition(partition) && beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) { Review comment: Ah, this is from before I removed it. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp } else { List> records = fetchRecords(nextInLineFetch, recordsRemaining); -if (!records.isEmpty()) { -TopicPartition partition = nextInLineFetch.partition; -List> currentRecords = fetched.get(partition); -if (currentRecords == null) { -fetched.put(partition, records); -} else { -// this case shouldn't usually happen because we only send one fetch at a time per partition, -// but it might conceivably happen in some rare cases (such as partition leader changes). -// we have to copy to a new list because the old one may be immutable -List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); -newRecords.addAll(currentRecords); -newRecords.addAll(records); -fetched.put(partition, newRecords); +TopicPartition partition = nextInLineFetch.partition; + +if (subscriptions.isAssigned(partition)) { Review comment: I copied this check from fetchRecords, which says "this can happen when a rebalance happened before fetched records are returned to the consumer's poll call". I.e., it seems like it can actually happen, but a comment is called for, at least. I'll add it. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp } else { List> records = fetchRecords(nextInLineFetch, recordsRemaining); -if (!records.isEmpty()) { -TopicPartition partition = nextInLineFetch.partition; -List> currentRecords = fetched.get(partition); -if (currentRecords == null) { -fetched.put(partition, records); -} else { -// this case shouldn't usually happen because we only send one fetch at a time per partition, -// but it might conceivably happen in some rare cases (such as partition leader changes). -// we have to copy to a new list because the old one may be immutable -List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); -newRecords.addAll(currentRecords); -newRecords.addAll(records); -fetched.put(partition, newRecords); +TopicPartition partition = nextInLineFetch.partition; + +if (subscriptions.isAssigned(partition)) { +// initializeCompletedFetch, above, has already persisted the metadata from the fetch in the +// SubscriptionState, so we can just read it out, which in particular lets us re-use the logic +// for determining the end offset +final long receivedTimestamp = nextInLineFetch.receivedTimestamp; +final Long beginningOffset = subscriptions.logStartOffset(partition); +final Long endOffset = subscriptions.logEndOffset(partition, isolationLevel); +final FetchPosition fetchPosition = subscriptions.position(partition); + +final FetchedRecords.FetchMetadata fetchMetadata = fetched.metadata().get(partition); +if (fetchMetadata == null +|| !fetchMetadata.position().offsetEpoch.isPresent() +|| fetchPosition.offsetEpoch.isPresent() +&& fetchMetadata.position().offsetEpoch.get() <= f
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9916: MINOR: Import RaftConfig parameters into KafkaConfig
aloknnikhil commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561315012 ## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ## @@ -941,4 +950,38 @@ class KafkaConfigTest { }) } + @Test + def testInvalidQuorumVotersConfig(): Unit = { +assertInvalidQuorumVoters("1") +assertInvalidQuorumVoters("1@") +assertInvalidQuorumVoters("1:") +assertInvalidQuorumVoters("blah@") +assertInvalidQuorumVoters("1@kafka1") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,") +assertInvalidQuorumVoters("1@kafka1:9092,2") +assertInvalidQuorumVoters("1@kafka1:9092,2@") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah") +assertInvalidQuorumVoters("1@kafka1:9092,2@blah,") + } + + private def assertInvalidQuorumVoters(value: String): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + } + + @Test + def testValidQuorumVotersConfig(): Unit = { +assertValidQuorumVoters("", 0) +assertValidQuorumVoters("1@127.0.0.1:9092", 1) +assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3) + } + + private def assertValidQuorumVoters(value: String, expectedVoterCount: Int): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) +assertDoesNotThrow(() => KafkaConfig.fromProps(props)) Review comment: Makes sense. Removed. ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) { return voterMap; } +public static class ControllerQuorumVotersValidator implements ConfigDef.Validator { +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +throw new ConfigException(name, null); +} + +@SuppressWarnings("unchecked") +List voterStrings = (List) value; + +if (voterStrings.size() == 0) { +// TODO: Add a flag to skip validation for an empty voter string, conditionally. +// For now, skip anyway. See https://github.com/apache/kafka/pull/9916#discussion_r560611932 Review comment: Fair enough. Removed. ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1258,6 +1267,15 @@ object KafkaConfig { .define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc) .define(PasswordEncoderKeyLengthProp, INT, Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc) .define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc) + + /** * Raft Quorum Configuration */ + .defineInternal(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QuorumVoters, new RaftConfig.ControllerQuorumVotersValidator(), HIGH) Review comment: Ack. Fixed. ## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ## @@ -35,24 +35,10 @@ public static RaftConfig buildRaftConfig( int appendLingerMs, List voterNodes ) { -Properties properties = new Properties(); -properties.put(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); -properties.put(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); -properties.put(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs); -properties.put(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, electionBackoffMs); -properties.put(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, fetchTimeoutMs); -properties.put(RaftConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs); - -StringBuilder votersString = new StringBuilder(); -String prefix = ""; -for (Node voter : voterNodes) { -votersString.append(prefix); - votersString.append(voter.id()).append('@').append(voter.host()).append(':').append(voter.port()); -prefix = ","; -} -properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, votersString.toString()); - -return new RaftConfig(properties); +Map voterConnections = voterNodes.stream() +.collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port(; +return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, Review comment: You're right. This is an artifact from the previous constructor usage. Fixed. ## F
[GitHub] [kafka] rajinisivaram merged pull request #9814: KAFKA-10869: Gate topic IDs behind IBP 2.8
rajinisivaram merged pull request #9814: URL: https://github.com/apache/kafka/pull/9814 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 commented on pull request #9253: URL: https://github.com/apache/kafka/pull/9253#issuecomment-764806843 call for review @ableegoldman @cadonna @wcarlson5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9720: KAFKA-10555: Improve client state machine
mjsax commented on pull request #9720: URL: https://github.com/apache/kafka/pull/9720#issuecomment-764959796 Btw: to what extend do we need to update the docs? We should at least add a section to `streams/upgrade_guide.html` to mention the change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9840: KAFKA-10867: Improved task idling
guozhangwang commented on a change in pull request #9840: URL: https://github.com/apache/kafka/pull/9840#discussion_r561459260 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -78,15 +89,149 @@ RecordQueue queue() { } } -PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) { +PartitionGroup(final TaskId id, + final Map partitionQueues, + final Sensor recordLatenessSensor, + final Sensor enforcedProcessingSensor, + final long maxTaskIdleMs) { +this.id = id; nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; +this.enforcedProcessingSensor = enforcedProcessingSensor; +this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; } +public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { +final Long lag = metadata.lag(); +if (lag != null) { +LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag); +fetchedLags.put(partition, lag); +} +} + +public boolean readyToProcess(final long wallClockTime) { +if (LOG.isTraceEnabled()) { +for (final Map.Entry entry : partitionQueues.entrySet()) { +LOG.trace( +"[{}] buffered/lag {}: {}/{}", +id, +entry.getKey(), +entry.getValue().size(), +fetchedLags.get(entry.getKey()) +); +} +} +// Log-level strategy: +// TRACE for messages that don't wait for fetches, since these may be logged at extremely high frequency +// DEBUG when we waited for a fetch and decided to wait some more, as configured +// DEBUG when we are ready for processing and didn't have to enforce processing +// INFO when we enforce processing, since this has to wait for fetches AND may result in disorder + +if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { +if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) { +final Set bufferedPartitions = new HashSet<>(); +final Set emptyPartitions = new HashSet<>(); +for (final Map.Entry entry : partitionQueues.entrySet()) { +if (entry.getValue().isEmpty()) { +emptyPartitions.add(entry.getKey()); +} else { +bufferedPartitions.add(entry.getKey()); +} +} +LOG.trace("[{}] Ready for processing because max.task.idle.ms is disabled." + + "\n\tThere may be out-of-order processing for this task as a result." + + "\n\tBuffered partitions: {}" + + "\n\tNon-buffered partitions: {}", + id, + bufferedPartitions, + emptyPartitions); +} +return true; Review comment: Should we log INFO if we are indeed enforcing processing? I.e. there are some empty partitions. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -134,6 +134,8 @@ @SuppressWarnings("deprecation") public class StreamsConfig extends AbstractConfig { +public static final long MAX_TASK_IDLE_MS_DISABLED = -1; Review comment: nit: move this down below to 147? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ## @@ -73,28 +82,22 @@ private final byte[] recordKey = intSerializer.serialize(null, 1); private final Metrics metrics = new Metrics(); +private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString()); private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap()); -private PartitionGroup group; private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) { final Sensor lastRecordedValue = metrics.sensor(metricName.name()); lastRecordedValue.add(metricName, new Value()); return lastRecordedValue; } -@Before Review comment: Good refactoring! ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -52,15 +58,20 @@ * (i.e., it increases or stays the same over time). */ public class PartitionGr
[GitHub] [kafka] wcarlson5 commented on pull request #9941: MINOR: further reduce StreamThread loop INFO logging to 2min summary
wcarlson5 commented on pull request #9941: URL: https://github.com/apache/kafka/pull/9941#issuecomment-764054886 Overall LGTM. I am not sure about the cache change but the changes to the log makes a lot of sense This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-764014519 @dengziming I realized that for this, it really depends on the IBP of the controller. (That is, we need UpdateMetadata to send topic IDs to all the brokers). So maybe instead of checking IBP it would make sense to check if the MetadataCache does not have any topic IDs. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9948: MINOR: fix record time in shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing
showuon commented on pull request #9948: URL: https://github.com/apache/kafka/pull/9948#issuecomment-765053527 LGTM! Thanks for the fix, @wcarlson5 . And all tests passed! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
rajinisivaram commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-763996884 @dengziming https://github.com/apache/kafka/pull/9814 has been merged, so this needs rebasing and the check for IBP. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9934: MINOR: Drop enable.metadata.quorum config
hachikuji commented on a change in pull request #9934: URL: https://github.com/apache/kafka/pull/9934#discussion_r562114271 ## File path: raft/README.md ## @@ -12,17 +12,14 @@ Below we describe the details to set this up. bin/test-raft-server-start.sh config/raft.properties ### Run Multi Node Quorum ### -Create 3 separate raft quorum properties as the following Review comment: Yeah, you're right. That's pretty annoying. Let me see if I can do anything about it. ## File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ## @@ -116,8 +116,8 @@ /** indicates the minimum required inter broker magic required to support the API */ public final byte minRequiredInterBrokerMagic; -/** indicates whether the API is enabled and should be exposed in ApiVersions **/ -public final boolean isEnabled; +/** indicates whether this is an API which is only exposed by the KIP-500 controller **/ +public final boolean isControllerOnlyApi; Review comment: Filed this JIRA: https://issues.apache.org/jira/browse/KAFKA-12232. ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1876,5 +1874,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" + s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + s" authentication responses from timing out") + +if (requiresZookeeper && zkConnect == null) { Review comment: Ack. Will add a test case. ## File path: core/src/main/scala/kafka/Kafka.scala ## @@ -65,11 +65,12 @@ object Kafka extends Logging { private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) -if (config.processRoles.isEmpty) { +if (config.requiresZookeeper) { Review comment: Hmm `requiresZookeeper` seemed more explicit. Using `processRoles` seemed a little more obscure. ## File path: core/src/main/scala/kafka/Kafka.scala ## @@ -65,11 +65,12 @@ object Kafka extends Logging { private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) -if (config.processRoles.isEmpty) { +if (config.requiresZookeeper) { Review comment: Hmm `requiresZookeeper` seemed more explicit. I thought using `processRoles` was a little obscure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org