[jira] [Commented] (KAFKA-12308) ConfigDef.parseType deadlock
[ https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290756#comment-17290756 ] Konstantine Karantasis commented on KAFKA-12308: [~tombentley] I actually think that the initial suggesting in https://issues.apache.org/jira/browse/KAFKA-7421 regarding the removal of the method lock is correct. The `DelegatingClassLoader` doesn't seem to need to be parallel because it delegates loading to either `PluginClassLoader` instances that are parallel capable or the parent which normally is the system classloader and should also be parallel. Note, that the loading sequence that you mention above, is inverted on purpose to actually implement classloading isolation. First we attempt loading the class from the "child" `PluginClassLoader` of the designated plugin and if not found then the parent classloader of the `DelegatingClassLoader` is consulted. I have updated the PR that had added a test for this type of deadlock originally submitted by [~gharris1727] in: [https://github.com/apache/kafka/pull/8259] cc [~rhauch] > ConfigDef.parseType deadlock > > > Key: KAFKA-12308 > URL: https://issues.apache.org/jira/browse/KAFKA-12308 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 2.5.0 > Environment: kafka 2.5.0 > centos7 > java version "1.8.0_231" >Reporter: cosmozhu >Priority: Major > Attachments: deadlock.log > > > hi, > the problem was found, when I restarted *ConnectDistributed* > I restart ConnectDistributed in the single node for the test, with not delete > connectors. > sometimes the process stopped when creating connectors. > I add some logger and found it had a deadlock in `ConfigDef.parseType`.My > connectors always have the same transforms. I guess when connector startup > (in startAndStopExecutor which default 8 threads) and load the same class > file it has something wrong. > I attached the jstack log file. > thanks for any help. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r582614443 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java ## @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import static org.junit.Assert.fail; + +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SynchronizationTest { + +public static final Logger log = LoggerFactory.getLogger(SynchronizationTest.class); + +@Rule +public final TestName testName = new TestName(); + +private String threadPrefix; +private Plugins plugins; +private ThreadPoolExecutor exec; +private Breakpoint dclBreakpoint; +private Breakpoint pclBreakpoint; + +@Before +public void setup() { +TestPlugins.assertAvailable(); +Map pluginProps = Collections.singletonMap( +WorkerConfig.PLUGIN_PATH_CONFIG, +String.join(",", TestPlugins.pluginPath()) +); +threadPrefix = SynchronizationTest.class.getSimpleName() ++ "." + testName.getMethodName() + "-"; +dclBreakpoint = new Breakpoint<>(); +pclBreakpoint = new Breakpoint<>(); +plugins = new Plugins(pluginProps) { +@Override +protected DelegatingClassLoader newDelegatingClassLoader(List paths) { +return AccessController.doPrivileged( +(PrivilegedAction) () -> +new SynchronizedDelegatingClassLoader(paths) +); +} +}; +exec = new ThreadPoolExecutor( +2, +2, +1000L, +TimeUnit.MILLISECONDS, +new LinkedBlockingDeque<>(), +threadFactoryWithNamedThreads(threadPrefix) +); + +} + +@After +public void tearDown() throws InterruptedException { +dclBreakpoint.clear(); +pclBreakpoint.clear(); +exec.shutdown(); +exec.awaitTermination(1L, TimeUnit.SECONDS); +} + +private static class Breakpoint { + +private Predicate predicate; +private CyclicBarrier barrier; + +public synchronized void clear() { +if (barrier != null) { +barrier.reset(); +} +predicate = null; +barrier = null; +} + +public synchronized void set(Predicate predicate) { +clear(); +this.predicate = predicate; +// As soon as the barrier is tripped, the barrier will be reset for the next round. +barrier = new CyclicBarrier(2); +} + +/** + * From a thread under test, await for the test orchestrator to continue execution + * @param obj Object to test with the breakpoint's current predicate + */
[GitHub] [kafka] kkonstantine commented on pull request #8259: KAFKA-7421: Reproduce Plugin/Delegating ClassLoader deadlock
kkonstantine commented on pull request #8259: URL: https://github.com/apache/kafka/pull/8259#issuecomment-785695890 I posted a change that hopefully fixes the issue and is also mentioned in https://issues.apache.org/jira/browse/KAFKA-7421 The method lock seems indeed to be there as an oversight. A change in test was also required to make the test classloader capable of parallel loading. Without this registration the test fails. It also fails if the method lock is retained. @gharris1727 please take another look. Also, cc @rhauch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dpoldrugo commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer
dpoldrugo commented on pull request #10059: URL: https://github.com/apache/kafka/pull/10059#issuecomment-785690250 Thanks @hachikuji and @omkreddy for you comments. Changed the tests as you proposed: https://github.com/apache/kafka/pull/10059/commits/c605ea41fb0b342e9bb17595941c0e658bb63360 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9203) kafka-client 2.3.1 fails to consume lz4 compressed topic
[ https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290739#comment-17290739 ] Xavier Léauté commented on KAFKA-9203: -- I submitted a PR that will help detect if we have an older buggy lz4 version on the classpath. This should help prevent these situations and provide a more helpful error message. https://github.com/apache/kafka/pull/10196 > kafka-client 2.3.1 fails to consume lz4 compressed topic > > > Key: KAFKA-9203 > URL: https://issues.apache.org/jira/browse/KAFKA-9203 > Project: Kafka > Issue Type: Bug > Components: compression, consumer >Affects Versions: 2.3.0, 2.3.1 >Reporter: David Watzke >Assignee: Ismael Juma >Priority: Blocker > Fix For: 2.4.0, 2.3.2 > > Attachments: kafka-clients-2.3.2-SNAPSHOT.jar > > > I run kafka cluster 2.1.1 > when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead > of 2.2.0, I immediately started getting the following exceptions in a loop > when consuming a topic with LZ4-compressed messages: > {noformat} > 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread] > com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred > while polling and processing messages: org.apache.kafka.common.KafkaExce > ption: Received exception when fetching the next record from > FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue > consumption. > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the > record to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473) > > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180) > > at > com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at scala.util.control.Exception$Catch.either(Exception.scala:252) > at > resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) > at > resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) > at > resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) > at > resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) > at > resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) > > at > resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) > > at > resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) > > at > resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) > > at > resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) > > at > resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at scala.util.control.Exception$Catch.either(Exception.scala:252) > at > resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582592473 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3303,6 +3304,28 @@ class KafkaApis(val requestChannel: RequestChannel, new DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs))) } + def handleListTransactionsRequest(request: RequestChannel.Request): Unit = { +val listTransactionsRequest = request.body[ListTransactionsRequest] +val response = new ListTransactionsResponseData() + +val filteredProducerIds = listTransactionsRequest.data.producerIdFilter.asScala.map(Long.unbox).toSet +val filteredStates = listTransactionsRequest.data.statesFilter.asScala.toSet + +txnCoordinator.handleListTransactions(filteredProducerIds, filteredStates) match { + case Left(error) => +response.setErrorCode(error.code) + case Right(transactions) => +val authorizedTransactions = transactions.filter { state => + authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, state.transactionalId) Review comment: Why `TransactionState` does not include error field (similar to `DescribeTransactionsResponseData.TransactionState`)? ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,49 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { +Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) + } else { +val filterStates = filterStateNames.flatMap(TransactionState.fromName) +val states = mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState] + +def shouldInclude(txnMetadata: TransactionMetadata): Boolean = { + if (txnMetadata.state == Dead) { +// We filter the `Dead` state since it is a transient state which +// indicates that the transactionalId and its metadata are in the Review comment: redundant "space" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type
chia7712 commented on a change in pull request #10162: URL: https://github.com/apache/kafka/pull/10162#discussion_r582587709 ## File path: clients/src/main/resources/common/message/README.md ## @@ -75,16 +75,24 @@ There are several primitive field types available. * "int16": a 16-bit integer. +* "uint16": a 16-bit unsigned integer. + * "int32": a 32-bit integer. +* "uint32": a 32-bit unsigned integer. + * "int64": a 64-bit integer. * "float64": is a double-precision floating point number (IEEE 754). * "string": a UTF-8 string. +* "uuid": a type 4 immutable universally unique identifier. + * "bytes": binary data. +* "records": memory record set or file record set. Review comment: That LGTM :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
dengziming commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r582587511 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -299,7 +290,16 @@ final class KafkaMetadataLog private ( // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists // on the file system, so we should first remove snapshotId and then delete snapshot file. expiredSnapshotIdsIter.remove() - Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId) + + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + val destination = Snapshots.deleteRename(path, snapshotId) + try { +Utils.atomicMoveWithFallback(path, destination) + } catch { +case e: IOException => + warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage) + } + scheduler.schedule("delete-snapshot-file", () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)) Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
dengziming commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r582587332 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -341,7 +342,7 @@ object KafkaMetadataLog { } } -val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, maxFetchSizeInBytes) +val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, topicPartition, maxFetchSizeInBytes) Review comment: I also find it's awkward to represent snapshot files using `OffsetAndEpoch` but we need to rely on `Snapshots` to remove and delete snapshots, will file a ticket to discuss consolidating file management, the first thought is to add `MetadataLogSnapshot` class to represent a snapshot file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582586860 ## File path: clients/src/main/resources/common/message/ListTransactionsRequest.json ## @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 66, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "ListTransactionsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "StatesFilter", "type": "[]string", "versions": "0+", + "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned" +}, +{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+", + "about": "The producerIds to filter by: if empty, no transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned" Review comment: > On a side note, do you think it would be clearer if we used null to indicate the absence of the filter? it should be fine in this case. However, the `empty` collection gets weird as it absolutely gets nothing :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore
vamossagar12 commented on pull request #10052: URL: https://github.com/apache/kafka/pull/10052#issuecomment-785663227 Thanks @cadonna , i have committed your suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582583372 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { +Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) + } else { +val filterStates = filterStateNames.flatMap(TransactionState.fromName) Review comment: Probably the response should have a field showing the invalid filters. The callers would know which filters are NOT working. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on pull request #10206: URL: https://github.com/apache/kafka/pull/10206#issuecomment-785640598 @chia7712 Thanks, I really appreciate your help reviewing these patches. I pushed a couple comments and left a few responses to your questions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582568344 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ## @@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch import scala.collection.{immutable, mutable} + +object TransactionState { + val AllStates = Set( +Empty, +Ongoing, +PrepareCommit, +PrepareAbort, +CompleteCommit, +CompleteAbort, +Dead, +PrepareEpochFence + ) + + def fromName(name: String): Option[TransactionState] = { +name match { + case "Empty" => Some(Empty) + case "Ongoing" => Some(Ongoing) + case "PrepareCommit" => Some(PrepareCommit) + case "PrepareAbort" => Some(PrepareAbort) + case "CompleteCommit" => Some(CompleteCommit) + case "CompleteAbort" => Some(CompleteAbort) + case "PrepareEpochFence" => Some(PrepareEpochFence) + case "Dead" => Some(Dead) + case _ => None +} + } + + def fromId(id: Byte): TransactionState = { Review comment: I left this one as it was since I am not 100% sure whether it has any impact on the performance of transaction loading. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582567640 ## File path: clients/src/main/resources/common/message/ListTransactionsRequest.json ## @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 66, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "ListTransactionsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "StatesFilter", "type": "[]string", "versions": "0+", + "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned" +}, +{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+", + "about": "The producerIds to filter by: if empty, no transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned" Review comment: Yeah, my bad. On a side note, do you think it would be clearer if we used `null` to indicate the absence of the filter? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582566067 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { Review comment: The problem is that we don't have a way to map from the producerId to the partition. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582565318 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { +Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) + } else { +val filterStates = filterStateNames.flatMap(TransactionState.fromName) Review comment: I could be persuaded probably. Suppose we add a new state in the future. If a user tried to query that state on an old broker, definitely no transactions would exist in that state. From that perspective, the implementation would return an accurate result. On the other hand, I can see how that might be misleading. If we decide to return an error code, then I think we'll have to treat the state filter a little more carefully from a versioning perspective. A new state would probably demand a new version. I think we tried to view the states a little bit more loosely in the `ListGroups` API (which this is modeled after) because we considered the state machine more of an internal implementation detail. This, by the way, is why we went with the string representation rather than numeric ids. This became more of a grey area though when the state filter was added to the request... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type
dengziming commented on a change in pull request #10162: URL: https://github.com/apache/kafka/pull/10162#discussion_r582564994 ## File path: clients/src/main/resources/common/message/README.md ## @@ -75,16 +75,24 @@ There are several primitive field types available. * "int16": a 16-bit integer. +* "uint16": a 16-bit unsigned integer. + * "int32": a 32-bit integer. +* "uint32": a 32-bit unsigned integer. + * "int64": a 64-bit integer. * "float64": is a double-precision floating point number (IEEE 754). * "string": a UTF-8 string. +* "uuid": a type 4 immutable universally unique identifier. + * "bytes": binary data. +* "records": memory record set or file record set. Review comment: I think we should avoid listing subclasses since we would have to update docs every time we add a new subclass, how about rewording this to "recordset such as memory recordset"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join
[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290685#comment-17290685 ] Guozhang Wang commented on KAFKA-10847: --- I agree that we should delete upon emitting expired records' joined results. Currently since we do range-query + deletion per input record, I guess in practice each time we would only expire very few records. If range query + deletion turns out to be an overhead in practice, we can consider 1) do range-query + deletion less frequently so that each time we would get a reasonable number of records to expire, and 2) use range deletion (https://rocksdb.org/blog/2018/11/21/delete-range.html), which would be efficient especially if we have more records to expire in one call. bq. I do a single-lookup in the store to check if the key is there, if not, then it continues; otherwise it calls the put(key, null) to delete it. Just a syntax sugar, you can just call `putIfAbsent(key, null)` instead. > Avoid spurious left/outer join results in stream-stream join > - > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sergio Peña >Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ivanyu commented on a change in pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys
ivanyu commented on a change in pull request #9990: URL: https://github.com/apache/kafka/pull/9990#discussion_r582559897 ## File path: core/src/main/scala/kafka/server/ConfigHelper.scala ## @@ -47,11 +47,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo def createResponseConfig(configs: Map[String, Any], createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = { -val filteredConfigPairs = if (resource.configurationKeys == null) +val filteredConfigPairs = if (resource.configurationKeys == null || resource.configurationKeys.isEmpty) Review comment: I agree with this. However it seems treating empty as null is de facto behavior at the moment (before the fix). To be more precise, here https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/core/src/main/scala/kafka/server/ConfigHelper.scala#L53-L55 `resource.configurationKeys` being empty means `true` for each element of `configs`, so effectively no filtering. Which, in turn, is equivalent to `resource.configurationKeys == null`. For example, `testDescribeConfigsWithDocumentation` and `testDescribeConfigsWithEmptyConfigurationKeys` in `ZkAdminManagerTest` break after the fix if treating empty as null is not kept. I guess it originates from that the default constructor of `DescribeConfigsResource` makes `configurationKeys` an empty array. We can either 1) keep the current behavior (the current approach); or 2) stick to the spec (also potentially adding `"default": "null"` to the field description). I'd prefer 2. What do you think, @cmccabe? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a change in pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys
ivanyu commented on a change in pull request #9990: URL: https://github.com/apache/kafka/pull/9990#discussion_r582559897 ## File path: core/src/main/scala/kafka/server/ConfigHelper.scala ## @@ -47,11 +47,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo def createResponseConfig(configs: Map[String, Any], createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = { -val filteredConfigPairs = if (resource.configurationKeys == null) +val filteredConfigPairs = if (resource.configurationKeys == null || resource.configurationKeys.isEmpty) Review comment: I agree with this. However it seems treating empty as null is de facto behavior at the moment (before the fix). To be more precise, here https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/core/src/main/scala/kafka/server/ConfigHelper.scala#L53-L55 `resource.configurationKeys` being empty means `true` for each element of `configs`, so effectively no filtering. Which, in turn, is equivalent to `resource.configurationKeys == null`. For example, `testDescribeConfigsWithDocumentation` and `testDescribeConfigsWithEmptyConfigurationKeys` in `ZkAdminManagerTest` break after the fix if treating empty as null is not kept. I guess it originates from that the default constructor of `DescribeConfigsResource` makes `configurationKeys` an empty array. We can either 1) keep the current behavior (the current approach); or 2) stick to the spec (also potentially adding `"default": "null"` to the field description). I'd prefer 2. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582558759 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ListTransactionsRequestData; +import org.apache.kafka.common.message.ListTransactionsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ListTransactionsRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +public final ListTransactionsRequestData data; + +public Builder(ListTransactionsRequestData data) { +super(ApiKeys.LIST_TRANSACTIONS); +this.data = data; +} + +@Override +public ListTransactionsRequest build(short version) { +return new ListTransactionsRequest(data, version); +} + +@Override +public String toString() { +return data.toString(); +} +} + +private final ListTransactionsRequestData data; + +private ListTransactionsRequest(ListTransactionsRequestData data, short version) { +super(ApiKeys.LIST_TRANSACTIONS, version); +this.data = data; +} + +public ListTransactionsRequestData data() { +return data; +} + +@Override +public ListTransactionsResponse getErrorResponse(int throttleTimeMs, Throwable e) { +Errors error = Errors.forException(e); +ListTransactionsResponseData response = new ListTransactionsResponseData() Review comment: Haha, I should really have checked for that after the last PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582558391 ## File path: clients/src/main/resources/common/message/ListTransactionsRequest.json ## @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 66, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "ListTransactionsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "StatesFilter", "type": "[]string", "versions": "0+", + "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned" +}, +{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+", + "about": "The producerIds to filter by: if empty, no transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned" Review comment: `no transactions will be returned` ^^ this document is a bit weird since the code shows that all transactions are returned if this filter is empty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582556446 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { +Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) + } else { +val filterStates = filterStateNames.flatMap(TransactionState.fromName) +val states = mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState] + +def shouldInclude(txnMetadata: TransactionMetadata): Boolean = { + if (txnMetadata.state == Dead) { Review comment: Could you add comment for this case? why it is excluded? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582549509 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { +Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) + } else { +val filterStates = filterStateNames.flatMap(TransactionState.fromName) +val states = mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState] + +def shouldInclude(txnMetadata: TransactionMetadata): Boolean = { + if (txnMetadata.state == Dead) { +false + } else if (filterProducerIds.nonEmpty && !filterProducerIds.contains(txnMetadata.producerId)) { +false + } else if (filterStateNames.nonEmpty && !filterStates.contains(txnMetadata.state)) { +false + } else { +true + } +} + +transactionMetadataCache.foreach { case (_, cache) => Review comment: Could you use `forKeyValue` instead of `foreach`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582547244 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ## @@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch import scala.collection.{immutable, mutable} + +object TransactionState { + val AllStates = Set( +Empty, +Ongoing, +PrepareCommit, +PrepareAbort, +CompleteCommit, +CompleteAbort, +Dead, +PrepareEpochFence + ) + + def fromName(name: String): Option[TransactionState] = { +name match { + case "Empty" => Some(Empty) + case "Ongoing" => Some(Ongoing) + case "PrepareCommit" => Some(PrepareCommit) + case "PrepareAbort" => Some(PrepareAbort) + case "CompleteCommit" => Some(CompleteCommit) + case "CompleteAbort" => Some(CompleteAbort) + case "PrepareEpochFence" => Some(PrepareEpochFence) + case "Dead" => Some(Dead) + case _ => None +} + } + + def fromId(id: Byte): TransactionState = { Review comment: How about `AllStates.find(_.id == id)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582547180 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ## @@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch import scala.collection.{immutable, mutable} + +object TransactionState { + val AllStates = Set( +Empty, +Ongoing, +PrepareCommit, +PrepareAbort, +CompleteCommit, +CompleteAbort, +Dead, +PrepareEpochFence + ) + + def fromName(name: String): Option[TransactionState] = { Review comment: How about `AllStates.find(_.name == name)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r582545612 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ListTransactionsRequestData; +import org.apache.kafka.common.message.ListTransactionsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ListTransactionsRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +public final ListTransactionsRequestData data; + +public Builder(ListTransactionsRequestData data) { +super(ApiKeys.LIST_TRANSACTIONS); +this.data = data; +} + +@Override +public ListTransactionsRequest build(short version) { +return new ListTransactionsRequest(data, version); +} + +@Override +public String toString() { +return data.toString(); +} +} + +private final ListTransactionsRequestData data; + +private ListTransactionsRequest(ListTransactionsRequestData data, short version) { +super(ApiKeys.LIST_TRANSACTIONS, version); +this.data = data; +} + +public ListTransactionsRequestData data() { +return data; +} + +@Override +public ListTransactionsResponse getErrorResponse(int throttleTimeMs, Throwable e) { +Errors error = Errors.forException(e); +ListTransactionsResponseData response = new ListTransactionsResponseData() Review comment: `throttleTimeMs` is neglected. ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { +Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) + } else { +val filterStates = filterStateNames.flatMap(TransactionState.fromName) +val states = mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState] + +def shouldInclude(txnMetadata: TransactionMetadata): Boolean = { + if (txnMetadata.state == Dead) { +false + } else if (filterProducerIds.nonEmpty && !filterProducerIds.contains(txnMetadata.producerId)) { +false + } else if (filterStateNames.nonEmpty && !filterStates.contains(txnMetadata.state)) { +false + } else { +true + } +} + +transactionMetadataCache.foreach { case (_, cache) => + cache.metadataPerTransactionalId.values.foreach { txnMetadata => +txnMetadata.inLock { + if (shouldInclude(txnMetadata)) { +states += new ListTransactionsResponseData.TransactionState() + .setTransactionalId(txnMetadata.transactionalId) + .setProducerId(txnMetadata.producerId) + .setTransactionState(txnMetadata.state.toString) Review comment: `txnMetadata.state.toString` -> `txnMetadata.state.name` ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ## @@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch import scala.collection.{immutable, mutable} + +object TransactionState { + val AllStates = Set( +Empty, +Ongoing, +PrepareCommit, +PrepareAbort, +CompleteCommit, +CompleteAbort, +Dead, +PrepareEpochFence + ) + + def fromName(name: String): Option[TransactionState] = { Review comment
[GitHub] [kafka] omkreddy edited a comment on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer
omkreddy edited a comment on pull request #10059: URL: https://github.com/apache/kafka/pull/10059#issuecomment-785604829 @dpoldrugo Thanks for the PR. LGTM. As @hachikuji mentioned, we should update `SaslAuthenticatorFailureNoDelayTest`, `SaslAuthenticatorTest` tests. I think, we may not need config, as we are already avoiding DNS lookup for SSL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer
omkreddy commented on pull request #10059: URL: https://github.com/apache/kafka/pull/10059#issuecomment-785604829 @dpoldrugo Thanks for the PR. LGTM. As @hachikuji mentioned, we should update `SaslAuthenticatorFailureNoDelayTest`, `SaslAuthenticatorTest` tests. I think, we may not need config, as we are already avoiding DNS look up for SSL authentication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type
chia7712 commented on a change in pull request #10162: URL: https://github.com/apache/kafka/pull/10162#discussion_r582529627 ## File path: clients/src/main/resources/common/message/README.md ## @@ -75,16 +75,24 @@ There are several primitive field types available. * "int16": a 16-bit integer. +* "uint16": a 16-bit unsigned integer. + * "int32": a 32-bit integer. +* "uint32": a 32-bit unsigned integer. + * "int64": a 64-bit integer. * "float64": is a double-precision floating point number (IEEE 754). * "string": a UTF-8 string. +* "uuid": a type 4 immutable universally unique identifier. + * "bytes": binary data. +* "records": memory record set or file record set. Review comment: There are many sub classes for this type as the java type used by KRPC is `BaseRecords`. We do make protocol data carry different sub class in processing data (before serialization). Hence, it seems to me listing sub classes in this document is a bit weird. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] twmb commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
twmb commented on pull request #10183: URL: https://github.com/apache/kafka/pull/10183#issuecomment-785551804 Also is there a reason not to include the `LastUpdateTimestamp`? It seems pretty valuable to know how long ago the transaction state was modified. I'm not sure how valuable LastProducerId or LastProducerEpoch would be, but I know those are available too, although (I think) those are in-memory only values used only for allowing retries of transactional calls. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] twmb edited a comment on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
twmb edited a comment on pull request #10183: URL: https://github.com/apache/kafka/pull/10183#issuecomment-785548579 Can the KIP be updated to change TransactionState from an int8 to a string (as implemented in this PR)? Alternatively, is there value in using a string vs. an int8? The string is more descriptive, but since the states are a defined enum, it'd be smaller to serialize/deserialize the int8 and then a client can perform the int8 => string conversion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
hachikuji commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r582491373 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial)); } + +/** + * Delete this snapshot from the filesystem. + */ +public static CompletableFuture deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException { +Path path = snapshotPath(logDir, snapshotId); +Path destination = Snapshots.deleteRename(path, snapshotId); +Utils.atomicMoveWithFallback(path, destination); + +return CompletableFuture.supplyAsync(() -> { Review comment: Probably not since the reference would be through the file descriptor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
hachikuji commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r582471506 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -299,7 +302,16 @@ final class KafkaMetadataLog private ( // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists // on the file system, so we should first remove snapshotId and then delete snapshot file. expiredSnapshotIdsIter.remove() - Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId) + + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + val destination = Snapshots.deleteRename(path, snapshotId) + try { +Utils.atomicMoveWithFallback(path, destination) + } catch { +case e: IOException => + warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage) Review comment: nit: Could this be error level? Can we use `$` substitutions? Also, let's include the full exception instead of just the message. ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -341,7 +342,7 @@ object KafkaMetadataLog { } } -val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, maxFetchSizeInBytes) +val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, topicPartition, maxFetchSizeInBytes) Review comment: Not something we need to solve here, but I think we should put some thought into consolidating file management. Right now it's a little awkward to divide responsibility between `KafkaMetadataLog` and `Log`. For example, I think we are trying to say that `KafkaMetadataLog` is responsible for snapshots. That is mostly true, but we are relying on `Log.loadSegmentFiles` for the deletion of orphaned `.delete` snapshots. ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -299,7 +290,16 @@ final class KafkaMetadataLog private ( // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists // on the file system, so we should first remove snapshotId and then delete snapshot file. expiredSnapshotIdsIter.remove() - Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId) + + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + val destination = Snapshots.deleteRename(path, snapshotId) + try { +Utils.atomicMoveWithFallback(path, destination) + } catch { +case e: IOException => + warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage) + } + scheduler.schedule("delete-snapshot-file", () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)) Review comment: Similar to `Log.deleteSegmentFiles`, we should probably use a delay here. I think it would be ok to either hard-code this to the default of 60s, or use `file.delete.delay.ms` from the default configuration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] twmb commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
twmb commented on pull request #10183: URL: https://github.com/apache/kafka/pull/10183#issuecomment-785548579 Can the KIP be updated to change TransactionState from an int8 to a string (as implemented in this PR)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
dengziming commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r582479381 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial)); } + +/** + * Delete this snapshot from the filesystem. + */ +public static CompletableFuture deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException { +Path path = snapshotPath(logDir, snapshotId); +Path destination = Snapshots.deleteRename(path, snapshotId); +Utils.atomicMoveWithFallback(path, destination); + +return CompletableFuture.supplyAsync(() -> { Review comment: If the file is deleted while Kafka is still reading the file, won't the reading fail since we have already renamed the log file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
dengziming commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r582477733 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial)); } + +/** + * Delete this snapshot from the filesystem. + */ +public static CompletableFuture deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException { +Path path = snapshotPath(logDir, snapshotId); +Path destination = Snapshots.deleteRename(path, snapshotId); +Utils.atomicMoveWithFallback(path, destination); + +return CompletableFuture.supplyAsync(() -> { +try { +return Files.deleteIfExists(destination); +} catch (IOException e) { +throw new RuntimeException("Error deleting snapshot file " + destination + ":" + e.getMessage()); +} +}); Review comment: Added schedule to `KafkaMetadataLog`, PTAL. Do you think we should add a `MetadataLogSnapshot` class instead of using `OffsetAndEpoch` to represent a snapshot? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji opened a new pull request #10206: URL: https://github.com/apache/kafka/pull/10206 This patch implements the `ListTransactions` API as documented in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This is only the server-side implementation and does not contain the `Admin` API. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12371) MirrorMaker 2.0 documentation is incorrect
[ https://issues.apache.org/jira/browse/KAFKA-12371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290598#comment-17290598 ] Luke Chen commented on KAFKA-12371: --- [~Scott-kirk], thanks for reporting the issue. I've fixed this issue in KAFKA-12350. You should be able to view the correct one after the kafka-site PR merged. Thank you. > MirrorMaker 2.0 documentation is incorrect > -- > > Key: KAFKA-12371 > URL: https://issues.apache.org/jira/browse/KAFKA-12371 > Project: Kafka > Issue Type: Improvement > Components: docs, documentation >Affects Versions: 2.7.0 >Reporter: Scott Kirkpatrick >Priority: Minor > > There are a few places in the official MirrorMaker 2.0 docs that are either > confusing or incorrect. Here are a few examples I've found: > The documentation for the 'sync.group.offsets.enabled' config states that > it's enabled by default > [here|https://github.com/apache/kafka-site/blob/61f4707381c369a98a7a77e1a7c3a11d5983909c/27/ops.html#L802], > but the actual source code indicates that it's disabled by default > [here|https://github.com/apache/kafka/blob/f75efb96fae99a22eb54b5d0ef4e23b28fe8cd2d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L185]. > I'm unsure if the intent is to have it enabled or disabled by default. > There are also some numerical typos, > [here|https://github.com/apache/kafka-site/blob/61f4707381c369a98a7a77e1a7c3a11d5983909c/27/ops.html#L791] > and > [here|https://github.com/apache/kafka-site/blob/61f4707381c369a98a7a77e1a7c3a11d5983909c/27/ops.html#L793]. > These lines state that the default is 6000 seconds (and incorrectly that > it's equal to 10 minutes), but the actual default is 600 seconds, shown > [here|https://github.com/apache/kafka/blob/f75efb96fae99a22eb54b5d0ef4e23b28fe8cd2d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L145] > and > [here|https://github.com/apache/kafka/blob/f75efb96fae99a22eb54b5d0ef4e23b28fe8cd2d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L152] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type
dengziming commented on a change in pull request #10162: URL: https://github.com/apache/kafka/pull/10162#discussion_r582432795 ## File path: clients/src/main/resources/common/message/README.md ## @@ -75,16 +75,24 @@ There are several primitive field types available. * "int16": a 16-bit integer. +* "uint16": a 16-bit unsigned integer. + * "int32": a 32-bit integer. +* "uint32": a 32-bit unsigned integer. + * "int64": a 64-bit integer. * "float64": is a double-precision floating point number (IEEE 754). * "string": a UTF-8 string. +* "uuid": a type 4 immutable universally unique identifier. + * "bytes": binary data. +* "records": memory record set or file record set. Review comment: Only `MemoryRecords` are supported when receiving records, but we can also set `FileRecords` when writing records. foe example `KafkaRaftClient.tryCompleteFetchRequest()` will call `partitionData.setRecordSet(log.read(xxx))` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10186: MINOR: bump release version to 3.0.0-SNAPSHOT
mjsax merged pull request #10186: URL: https://github.com/apache/kafka/pull/10186 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit
guozhangwang commented on a change in pull request #10205: URL: https://github.com/apache/kafka/pull/10205#discussion_r582428237 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1936,7 +1934,7 @@ public void process(final Object key, final Object value) { clientSupplier.consumer.addRecord(new ConsumerRecord<>( topic1, 1, -0L, +100L, Review comment: Ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit
guozhangwang commented on a change in pull request #10205: URL: https://github.com/apache/kafka/pull/10205#discussion_r582426740 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1844,19 +1844,17 @@ public void process(final Object key, final Object value) {} assertEquals(0, punctuatedWallClockTime.size()); mockTime.sleep(100L); -for (long i = 0L; i < 10L; i++) { Review comment: Since we do not need it really: a single record is sufficient to trigger the punctuation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] highluck commented on a change in pull request #9851: KAFKA-10769 Remove JoinGroupRequest#containsValidPattern as it is dup…
highluck commented on a change in pull request #9851: URL: https://github.com/apache/kafka/pull/9851#discussion_r582425104 ## File path: clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ## @@ -59,38 +59,12 @@ public String toString() { public static final int UNKNOWN_GENERATION_ID = -1; public static final String UNKNOWN_PROTOCOL_NAME = ""; -private static final int MAX_GROUP_INSTANCE_ID_LENGTH = 249; - /** * Ported from class Topic in {@link org.apache.kafka.common.internals} to restrict the charset for * static member id. */ public static void validateGroupInstanceId(String id) { -if (id.equals("")) -throw new InvalidConfigurationException("Group instance id must be non-empty string"); Review comment: I modified code thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582415502 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { +if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset())) + transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex) Review comment: It may be possible to call `transactionMetadata.appendTransactionIndex()` here though to append a little bit sooner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { +if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset())) + transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex) Review comment: Re: changing transaction code Since cleanedIndex is not created until here, we need to delay appends. Originally it was created at the start of `cleanSegments`. In that method, we now can not guarantee the log has been created until `cleanInto` returns with a defined `cleanedSegment`. In `cleanInto` there is a chance to call `onControlBatchRead` which previously just appeneded transactions, but can no longer do so if `cleanedIndex` is not defined. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { +if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset())) + transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex) Review comment: Re: changing transaction code Since cleanedIndex is not created until here, we need to delay appends. Originally it was created at the start of `cleanSegments`. In that method, we now can not guarantee the log has been created until `cleanInto` returns with a defined `cleanedSegment`. In `cleanSegments` there is a chance to call `onControlBatchRead` which previously just appeneded transactions, but can no longer do so if `cleanedIndex` is not defined. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { +if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset())) + transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex) Review comment: Re: changing transaction code Since cleanedIndex is not created until here, we need to delay appends. Originally it was created at the start of `cleanSegments`. In that method, we now can not guarantee the log has been created until `cleanInto` returns with a defined `cleanedSegment`. In `cleanSegments` there is a chance to call `onControlBatchRead` which previously just appeneded transactions, but can no longer do so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { +if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset())) + transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex) Review comment: Re: changing transaction code Since cleanedIndex is not created until here, we need to delay appends. Originally it was created at the start of `cleanSegments`. In that method, we can not guarantee the log has been created until `cleanInto` returns with a defined `cleanedSegment`. In `cleanSegments` there is a chance to call `onControlBatchRead` which previously just appeneded transactions, but can no longer do so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582407274 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -599,21 +606,30 @@ private[log] class Cleaner(val id: Int, } currentSegmentOpt = nextSegmentOpt } - - cleaned.onBecomeInactiveSegment() - // flush new segment to disk before swap - cleaned.flush() - - // update the modification date to retain the last modified date of the original files - val modified = segments.last.lastModified - cleaned.lastModified = modified - - // swap in new segment - info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") - log.replaceSegments(List(cleaned), segments) + + // Result of cleaning included at least one record. + if (cleanedSegment.isDefined) { +val cleaned = cleanedSegment.get +cleaned.onBecomeInactiveSegment() +// flush new segment to disk before swap +cleaned.flush() + +// update the modification date to retain the last modified date of the original files +val modified = segments.last.lastModified +cleaned.lastModified = modified + +// swap in new segment +info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") +log.replaceSegments(List(cleaned), segments) Review comment: As for the first part about `replaceSegments`. It seems that there are a few other times we call this method. I'm wondering if we would want to update the logStartOffset in these cases (probably) and what the reason should be. I'm also wondering if we should include the reason as a parameter to `replaceSegments`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582406089 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -599,21 +606,30 @@ private[log] class Cleaner(val id: Int, } currentSegmentOpt = nextSegmentOpt } - - cleaned.onBecomeInactiveSegment() - // flush new segment to disk before swap - cleaned.flush() - - // update the modification date to retain the last modified date of the original files - val modified = segments.last.lastModified - cleaned.lastModified = modified - - // swap in new segment - info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") - log.replaceSegments(List(cleaned), segments) + + // Result of cleaning included at least one record. + if (cleanedSegment.isDefined) { +val cleaned = cleanedSegment.get +cleaned.onBecomeInactiveSegment() +// flush new segment to disk before swap +cleaned.flush() + +// update the modification date to retain the last modified date of the original files +val modified = segments.last.lastModified +cleaned.lastModified = modified + +// swap in new segment +info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") +log.replaceSegments(List(cleaned), segments) Review comment: Right. I wasn't sure if it made sense to change the deleteSegments code to support the SegmentCompaction reason. The entire segment is being deleted here, but I see how this may be confusing when reading in the log. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12211) NoSuchFileException will be thrown if hasPersistentStores is false when creating stateDir
[ https://issues.apache.org/jira/browse/KAFKA-12211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12211. Resolution: Fixed > NoSuchFileException will be thrown if hasPersistentStores is false when > creating stateDir > - > > Key: KAFKA-12211 > URL: https://issues.apache.org/jira/browse/KAFKA-12211 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0, 2.6.1 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Minor > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > We improved the state directory folder/file permission setting in > KAFKA-10705. But we forgot to consider one situation: if user doesn't have > PersistentStores, we won't need to create base dir and state dir. And if > there's no such dir/file, and we tried to set permission to them, we'll have > *NoSuchFileException* > > {code:java} > ERROR Error changing permissions for the state or base directory > /var/folders/4l/393lkmzx3zvftwynjngzdsfwgn/T/kafka-11254487964259813330/appId_AdjustStreamThreadCountTestshouldAddStreamThread > (org.apache.kafka.streams.processor.internals.StateDirectory:117) > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] > java.nio.file.NoSuchFileException: > /var/folders/4l/393lkmzx3zvftwynjngzdsfwgn/T/kafka-11254487964259813330/appId_AdjustStreamThreadCountTestshouldAddStreamThread > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] at > java.base/sun.nio.fs.UnixFileAttributeViews$Posix.setMode(UnixFileAttributeViews.java:254) > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] at > java.base/sun.nio.fs.UnixFileAttributeViews$Posix.setPermissions(UnixFileAttributeViews.java:276) > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] at > java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2079) > 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] at > org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582402100 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -582,13 +586,16 @@ private[log] class Cleaner(val id: Int, transactionMetadata.addAbortedTransactions(abortedTransactions) val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs -info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + +info(s"Cleaning $currentSegment in log ${log.name} into ${currentSegment.baseOffset} " + Review comment: Hmmm I think I didn't really understand this log at first. It does make sense to move it to when we know what the new segment is. I think the issue I had was what to do with including `retainDeletesAndTxnMarkers`. Maybe it makes sense to have a log saying the current segment and the deletion info and a second log to say what the new segment's baseOffset is later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582402100 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -582,13 +586,16 @@ private[log] class Cleaner(val id: Int, transactionMetadata.addAbortedTransactions(abortedTransactions) val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs -info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + +info(s"Cleaning $currentSegment in log ${log.name} into ${currentSegment.baseOffset} " + Review comment: Hmmm I think I didn't really understand this log at first. It does make sense to move it to when we know what the new segment is. I think the issue I had was what to do with `retainDeletesAndTxnMarkers`. Maybe it makes sense to have a log saying the current segment and the deletion info and a second log to say what the new segment's baseOffset is later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
hachikuji commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r582378783 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this -// class, which setting flushing = true will handle by storing any new values into a new +// class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. -flushing = true; -boolean flushStarted = offsetWriter.beginFlush(); +// No need to begin a new offset flush if we timed out waiting for records to be flushed to +// Kafka in a prior attempt. +if (!recordFlushPending) { Review comment: > (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd. Ok, that rings a bell. I think I see how the logic works now and I don't see an obvious way to make it simpler. Doing something finer-grained as you said might be the way to go. Anyway, I agree this is something to save for a follow-up improvement. > I think it's a necessary evil, since source task offset commits are conducted on a single thread. Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster. Hmm.. This is suspicious. Why do we need to block the executor while we wait for the flush? Would it be simpler to let the worker source task finish the flush and the offset commit in its own event thread? We end up blocking the event thread anyway because of the need to do it under the lock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-10810) Add a replace thread option to the streams uncaught exception handler
[ https://issues.apache.org/jira/browse/KAFKA-10810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson closed KAFKA-10810. -- > Add a replace thread option to the streams uncaught exception handler > --- > > Key: KAFKA-10810 > URL: https://issues.apache.org/jira/browse/KAFKA-10810 > Project: Kafka > Issue Type: Improvement >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: streams > Fix For: 2.8.0 > > > Add an option to replace threads that have died. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-9331) Add option to terminate application when StreamThread(s) die
[ https://issues.apache.org/jira/browse/KAFKA-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson closed KAFKA-9331. - > Add option to terminate application when StreamThread(s) die > > > Key: KAFKA-9331 > URL: https://issues.apache.org/jira/browse/KAFKA-9331 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Assignee: Walker Carlson >Priority: Minor > Labels: needs-kip > > Currently, if a {{StreamThread}} dies due to an unexpected exception, the > Streams application continues running. Even if all {{StreamThread}}(s) die, > the application will continue running, but will be in an {{ERROR}} state. > Many users want or expect the application to terminate in the event of a > fatal exception that kills one or more {{StreamThread}}(s). Currently, this > requires extra work from the developer to register an uncaught exception > handler on the {{KafkaStreams}} object and trigger a shutdown as needed. > It would be useful to provide a configurable option for the Streams > application to have it automatically terminate with an exception if one or > more {{StreamThread}}(s) die. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time
[ https://issues.apache.org/jira/browse/KAFKA-4748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson closed KAFKA-4748. - > Need a way to shutdown all workers in a Streams application at the same time > > > Key: KAFKA-4748 > URL: https://issues.apache.org/jira/browse/KAFKA-4748 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Elias Levy >Assignee: Walker Carlson >Priority: Major > Fix For: 2.8.0 > > > If you have a fleet of Stream workers for an application and attempt to shut > them down simultaneously (e.g. via SIGTERM and > Runtime.getRuntime().addShutdownHook() and streams.close())), a large number > of the workers fail to shutdown. > The problem appears to be a race condition between the shutdown signal and > the consumer rebalancing that is triggered by some of the workers existing > before others. Apparently, workers that receive the signal later fail to > exit apparently as they are caught in the rebalance. > Terminating workers in a rolling fashion is not advisable in some situations. > The rolling shutdown will result in many unnecessary rebalances and may > fail, as the application may have large amount of local state that a smaller > number of nodes may not be able to store. > It would appear that there is a need for a protocol change to allow the > coordinator to signal a consumer group to shutdown without leading to > rebalancing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-6943) Have option to shutdown KS cleanly if any threads crashes, or if all threads crash
[ https://issues.apache.org/jira/browse/KAFKA-6943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson closed KAFKA-6943. - > Have option to shutdown KS cleanly if any threads crashes, or if all threads > crash > -- > > Key: KAFKA-6943 > URL: https://issues.apache.org/jira/browse/KAFKA-6943 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Assignee: Walker Carlson >Priority: Major > Labels: user-experience > Fix For: 2.8.0 > > > ATM users have to implement this themselves. Might be nice to have an option > to configure that if all threads crash, or if any crash, to initiate clean > shutdown. > This also has a gotcha where atm if you call KS#close without a timeout, from > the uncaught exception handler, you dead lock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582363641 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -1094,6 +1119,8 @@ private[log] class CleanedTransactionMetadata { // Output cleaned index to write retained aborted transactions var cleanedIndex: Option[TransactionIndex] = None + + var toAppend: List[AbortedTxn] = List.empty Review comment: I believe the issue is that this code expects the log to be created already but it is not. I think my intention was to delay this operation until the log is created. I'll double check this is actually the case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582343277 ## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ## @@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable deleteHorizonMs -info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + +info(s"Cleaning $currentSegment in log ${log.name} into ${currentSegment.baseOffset} " + s"with deletion horizon $deleteHorizonMs, " + s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.") try { - cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, + cleanedSegment = cleanInto(log, log.topicPartition, currentSegment.log, cleanedSegment, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, Review comment: nit: it shouldn't be necessary to pass both `log` and `log.topicPartition`. ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -582,13 +586,16 @@ private[log] class Cleaner(val id: Int, transactionMetadata.addAbortedTransactions(abortedTransactions) val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs -info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + +info(s"Cleaning $currentSegment in log ${log.name} into ${currentSegment.baseOffset} " + Review comment: This log message no longer makes sense. Is the intent to use `cleanedSegment` instead of `currentSegment`? Perhaps we should consider moving this log line to after the cleaning when we know what the new segment is. ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -599,21 +606,30 @@ private[log] class Cleaner(val id: Int, } currentSegmentOpt = nextSegmentOpt } - - cleaned.onBecomeInactiveSegment() - // flush new segment to disk before swap - cleaned.flush() - - // update the modification date to retain the last modified date of the original files - val modified = segments.last.lastModified - cleaned.lastModified = modified - - // swap in new segment - info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") - log.replaceSegments(List(cleaned), segments) + + // Result of cleaning included at least one record. + if (cleanedSegment.isDefined) { +val cleaned = cleanedSegment.get +cleaned.onBecomeInactiveSegment() +// flush new segment to disk before swap +cleaned.flush() + +// update the modification date to retain the last modified date of the original files +val modified = segments.last.lastModified +cleaned.lastModified = modified + +// swap in new segment +info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") +log.replaceSegments(List(cleaned), segments) Review comment: Would it make more sense to update log start offset in `replaceSegments` when we are protected by the lock? Otherwise, it seems like the log start offset will be out of sync with the segments until all of them have been cleaned and `maybeIncrementLogStartOffset` is called. Note that `deleteSegments` does update the log start offset. Interestingly, `SegmentDeletion` is used for the reason in this case, and not `SegmentCompaction`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join
[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290502#comment-17290502 ] Sergio Peña commented on KAFKA-10847: - [~mjsax] Just did a quick test when deleting the emitted the noj-joined record. It didn't affect too much the performance. I will keep this deletion if it causes a problem when rebalance happens. > Avoid spurious left/outer join results in stream-stream join > - > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sergio Peña >Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
C0urante commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this -// class, which setting flushing = true will handle by storing any new values into a new +// class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. -flushing = true; -boolean flushStarted = offsetWriter.beginFlush(); +// No need to begin a new offset flush if we timed out waiting for records to be flushed to +// Kafka in a prior attempt. +if (!recordFlushPending) { Review comment: 1. I think it's a necessary evil, since source task offset commits are [conducted on a single thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64). Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster. 2. This is definitely possible; I think the only saving grace here is that the combined sizes of the `outstandingMessages` and `outstandingMessagesBacklog` fields is going to be naturally throttled by the producer's buffer. If too many records are accumulated, the call to `Producer::send` will block synchronously until space is freed up, at which point, the worker can continue polling the task for new records. This isn't ideal as it will essentially cause the producer's entire buffer to be occupied until the throughput of record production from the task decreases and/or the write throughput of the producer rises to meet it, but it at least establishes an upper bound for how large a single batch of records in the `oustandingMessages` field ever gets. It may take several offset commit attempts for all of the records in that batch to be ack'd, with all but the last (successful) attempt timing out and failing, but forward progress with offset commits should still be possible. I share your feelings about the complexity here. I think ultimately it arises from two constraints: 1. A worker-global producer is used to write source offsets to the internal offsets topic right now. Although this doesn't necessarily require the single-threaded logic for offset commits mentioned above, things become simpler with it. 2. (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd. I don't think either of these points make it impossible to add even more-fine-grained offset commit behavior and/or remove offset commit timeouts, but the work involved would be a fair amount heavier than this relatively-minor patch. If you'd prefer to see something along those lines, could we consider merging this patch for the moment and perform a more serious overhaul of the source task offset commit logic as a follow-up, possibly with a small design discussion on a Jira ticket to make sure there's alignment on the new behavior? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
C0urante commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this -// class, which setting flushing = true will handle by storing any new values into a new +// class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. -flushing = true; -boolean flushStarted = offsetWriter.beginFlush(); +// No need to begin a new offset flush if we timed out waiting for records to be flushed to +// Kafka in a prior attempt. +if (!recordFlushPending) { Review comment: 1. I think it's a necessary evil, since source task offset commits are [conducted on a single thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64). Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster. 2. This is definitely possible; I think the only saving grace here is that the combined sizes of the `outstandingMessages` and `outstandingMessagesBacklog` fields is going to be naturally throttled by the producer's buffer. If too many records are accumulated, the call to `Producer::send` will block synchronously until space is freed up, at which point, the worker can continue polling the task for new records. This isn't ideal as it will essentially cause the producer's entire buffer to be occupied until the throughput of record production from the task decreases and/or the write throughput of the producer rises to meet it, but it at least establishes an upper bound for how large a single batch of records in the `oustandingMessages` field ever gets. It may take several offset commit attempts for all of the records in that batch to be ack'd, with all but the last (successful) attempt timing out and failing, but forward progress with offset commits should still be possible. I share your feelings about the complexity here. I think ultimately it arises from two constraints: 1. A worker-global producer is used to write source offsets to the internal offsets topic right now. Although this doesn't necessarily require the single-threaded logic for offset commits mentioned above, things become simpler with it. 2. (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd. I don't think either of these points make it impossible to add even more-fine-grained offset commit behavior and/or remove offset commit timeouts, but the work involved would be a fair amount heavier than this relatively-minor patch. If you'd prefer to see something along those lines, could we merge this patch for the moment and perform a more serious overhaul of the source task offset commit logic as a follow-up, possibly with a small design discussion on a Jira ticket to make sure there's alignment on the new behavior? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12374) Add missing config sasl.mechanism.controller.protocol
Ron Dagostino created KAFKA-12374: - Summary: Add missing config sasl.mechanism.controller.protocol Key: KAFKA-12374 URL: https://issues.apache.org/jira/browse/KAFKA-12374 Project: Kafka Issue Type: Bug Components: config Affects Versions: 2.8.0 Reporter: Ron Dagostino Assignee: Ron Dagostino Fix For: 2.8.0 The config `sasl.mechanism.controller.protocol` from KIP-631 is not implemented. Furthermore, `KafkaRaftManager` is using inter-broker security information when it connects to the Raft controller quorum. KafkaRaftClient should use the first entry in `controller.listener.names` to determine the listener name; that listener name's mapped value in the `listener.security.protocol.map` (if such a mapping exists, otherwise the listener name itself) for the security protocol; and the value of `sasl.mechanism.controller.protocol` for the SASL mechanism. Finally, `RaftControllerNodeProvider` needs to use the value of `sasl.mechanism.controller.protocol` instead of the inter-broker sasl mechanism (it currently determines the listener name and security protocol correctly) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit
mjsax commented on a change in pull request #10205: URL: https://github.com/apache/kafka/pull/10205#discussion_r582349190 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1844,19 +1844,17 @@ public void process(final Object key, final Object value) {} assertEquals(0, punctuatedWallClockTime.size()); mockTime.sleep(100L); -for (long i = 0L; i < 10L; i++) { Review comment: Why do we remove this loop? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1936,7 +1934,7 @@ public void process(final Object key, final Object value) { clientSupplier.consumer.addRecord(new ConsumerRecord<>( topic1, 1, -0L, +100L, Review comment: nit: can we pick a different value to disambiguate (wall-clock time is set to `100L` above)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
hachikuji commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r582259574 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -98,7 +98,8 @@ private IdentityHashMap, ProducerRecord> outstandingMessages; // A second buffer is used while an offset flush is running private IdentityHashMap, ProducerRecord> outstandingMessagesBacklog; -private boolean flushing; +private boolean recordFlushPending; +private boolean offsetFlushPending; private CountDownLatch stopRequestedLatch; Review comment: nit: while we're at it, this could be `final` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this -// class, which setting flushing = true will handle by storing any new values into a new +// class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. -flushing = true; -boolean flushStarted = offsetWriter.beginFlush(); +// No need to begin a new offset flush if we timed out waiting for records to be flushed to +// Kafka in a prior attempt. +if (!recordFlushPending) { Review comment: If I understand it correctly, the main difference in this patch is that we no longer fail the flush if the messages cannot be drained quickly enough from `outstandingMessages`. A few questions come to mind: 1. Is the flush timeout still a useful configuration? Was it ever? Even if we timeout, we still have to wait for the records that were sent to the producer. 2. While we are waiting for `outstandingMessages` to be drained, we are still accumulating messages in `outstandingMessagesBacklog`. I imagine we can get into a pattern here once we fill up the accumulator. While we're waiting for `outstandingMessages` to complete, we fill `outstandingMessagesBacklog`. Once the flush completes, `outstandingMessagesBacklog` becomes `outstandingMessages` and we are stuck waiting again. Could this prevent us from satisfying the commit interval? Overall, I can't shake the feeling that this logic is more complicated than necessary. Why do we need the concept of flushing at all? It would be more intuitive to just commit whatever the latest offsets are. Note that we do not use `outstandingMessages` for the purpose of retries. Once a request has been handed off to the producer successfully, we rely on the producer to handle retries. Any delivery failure after that is treated as fatal. So then does `oustandingMessages` serve any other purpose other than tracking flushing? I am probably missing something here. It has been a long time since I reviewed this logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit
guozhangwang opened a new pull request #10205: URL: https://github.com/apache/kafka/pull/10205 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12267) Implement DescribeTransactions API
[ https://issues.apache.org/jira/browse/KAFKA-12267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12267. - Resolution: Fixed > Implement DescribeTransactions API > -- > > Key: KAFKA-12267 > URL: https://issues.apache.org/jira/browse/KAFKA-12267 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
hachikuji merged pull request #10183: URL: https://github.com/apache/kafka/pull/10183 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys
cmccabe commented on pull request #9990: URL: https://github.com/apache/kafka/pull/9990#issuecomment-785357271 ping @ivanyu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on pull request #10184: URL: https://github.com/apache/kafka/pull/10184#issuecomment-785354588 rebased on trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown
[ https://issues.apache.org/jira/browse/KAFKA-12373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-12373: --- Description: The current implementation simply closes the metrics group when it is closed. When closing the KafkaRaftClient that is the leader it should perform at least the following steps: # Stop accepting new schedule append operations # Append to the log the batches currently in the BatchAccumulator # Wait with a timeout for the high-watermark to reach the LEO # Cooperatively resign as leader from the quorum was: The current implementation simply closes the metrics group when it is closed. When closing the KafkaRaftClient that is the leader it should perform at least the following steps: # Stop accepting new schedule append operations # Append to the log batches currently in the BatchAccumulator # Wait with a timeout for the high-watermark to reach the LEO # Cooperatively resign as leader from the quorum > Improve KafkaRaftClient handling of graceful shutdown > - > > Key: KAFKA-12373 > URL: https://issues.apache.org/jira/browse/KAFKA-12373 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Priority: Major > > The current implementation simply closes the metrics group when it is closed. > When closing the KafkaRaftClient that is the leader it should perform at > least the following steps: > # Stop accepting new schedule append operations > # Append to the log the batches currently in the BatchAccumulator > # Wait with a timeout for the high-watermark to reach the LEO > # Cooperatively resign as leader from the quorum -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10204: MINOR: Fix the generation extraction util
guozhangwang commented on pull request #10204: URL: https://github.com/apache/kafka/pull/10204#issuecomment-785350477 > Sorry that I didn't notice this issue :( No worries!! :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10204: MINOR: Fix the generation extraction util
guozhangwang merged pull request #10204: URL: https://github.com/apache/kafka/pull/10204 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown
Jose Armando Garcia Sancio created KAFKA-12373: -- Summary: Improve KafkaRaftClient handling of graceful shutdown Key: KAFKA-12373 URL: https://issues.apache.org/jira/browse/KAFKA-12373 Project: Kafka Issue Type: Sub-task Components: replication Reporter: Jose Armando Garcia Sancio The current implementation simply closes the metrics group when it is closed. When closing the KafkaRaftClient that is the leader it should perform at least the following steps: # Stop accepting new schedule append operations # Append to the log batches currently in the BatchAccumulator # Wait with a timeout for the high-watermark to reach the LEO # Cooperatively resign as leader from the quorum -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
ijuma commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582266116 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val deleteTopicsRequest = request.body[DeleteTopicsRequest] +val nameToId = new mutable.HashMap[String, Uuid] +deleteTopicsRequest.data().topicNames().iterator().asScala.foreach { + name => nameToId.put(name, Uuid.ZERO_UUID) +} +deleteTopicsRequest.data().topics().iterator().asScala.foreach { + nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId()) +} +val (describable, deletable) = { + if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) { +(nameToId.keySet, nameToId.keySet) + } else { +val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n) +val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DELETE, TOPIC, nameToId.keys)(n => n) +(authorizedDescribeTopics, authorizedDeleteTopics) + } +} +def sendResponse(response: DeleteTopicsResponseData): Unit = { + nameToId.keysIterator.foreach { Review comment: There is no copy when using foreach (both Java or Scala versions). And Scala lambdas are automatically converted to Java lambdas. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12158) Consider better return type of RaftClient.scheduleAppend
[ https://issues.apache.org/jira/browse/KAFKA-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290233#comment-17290233 ] Jose Armando Garcia Sancio commented on KAFKA-12158: Yes [~sagarrao] . Feel free to pick this up. > Consider better return type of RaftClient.scheduleAppend > > > Key: KAFKA-12158 > URL: https://issues.apache.org/jira/browse/KAFKA-12158 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Priority: Major > > Currently `RaftClient` has the following Append API: > {code} > Long scheduleAppend(int epoch, List records); > {code} > There are a few possible cases that the single return value is trying to > handle: > 1. The epoch doesn't match or we are not the current leader => return > Long.MaxValue > 2. We failed to allocate memory to write the the batch (backpressure case) => > return null > 3. We successfully scheduled the append => return the expected offset > It might be better to define a richer type so that the cases that must be > handled are clearer. At a minimum, it would be better to return > `OptionalLong` and get rid of the null case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582259112 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val deleteTopicsRequest = request.body[DeleteTopicsRequest] +val nameToId = new mutable.HashMap[String, Uuid] +deleteTopicsRequest.data().topicNames().iterator().asScala.foreach { + name => nameToId.put(name, Uuid.ZERO_UUID) +} +deleteTopicsRequest.data().topics().iterator().asScala.foreach { + nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId()) +} +val (describable, deletable) = { + if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) { +(nameToId.keySet, nameToId.keySet) + } else { +val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n) +val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DELETE, TOPIC, nameToId.keys)(n => n) +(authorizedDescribeTopics, authorizedDeleteTopics) + } +} +def sendResponse(response: DeleteTopicsResponseData): Unit = { + nameToId.keysIterator.foreach { Review comment: the main rationale for this is that I don't want scala to perform any copying, and I know for sure that if I give an iterator that it won't. I wouldn't mind using Java's forEach directly but I couldn't find a good way to convert a scala closure to a java closure (maybe this is obvious and I missed it?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582257154 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -541,6 +559,41 @@ static void validateNewTopicNames(Map topicErrors, return configChanges; } +ControllerResult deleteTopics(Map nameToId) { +DeleteTopicsResponseData result = new DeleteTopicsResponseData(); +List records = new ArrayList<>(); +for (Entry entry : nameToId.entrySet()) { +ApiError error = deleteTopic(entry.getKey(), entry.getValue(), records); +result.responses().add(new DeletableTopicResult(). +setName(entry.getKey()). +setTopicId(entry.getValue()). +setErrorCode(error.error().code()). +setErrorMessage(error.message())); +} +return new ControllerResult<>(records, result); +} + +ApiError deleteTopic(String name, + Uuid providedId, + List records) { +Uuid realId = topicsByName.get(name); +if (realId == null) { +return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, +"Unable to locate the provided topic name."); +} +if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) { +return new ApiError(UNKNOWN_TOPIC_ID, +"The provided topic ID does not match the provided topic name."); +} +TopicControlInfo topic = topics.get(realId); +if (topic == null) { +return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate topic id."); Review comment: the code has changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582257154 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -541,6 +559,41 @@ static void validateNewTopicNames(Map topicErrors, return configChanges; } +ControllerResult deleteTopics(Map nameToId) { +DeleteTopicsResponseData result = new DeleteTopicsResponseData(); +List records = new ArrayList<>(); +for (Entry entry : nameToId.entrySet()) { +ApiError error = deleteTopic(entry.getKey(), entry.getValue(), records); +result.responses().add(new DeletableTopicResult(). +setName(entry.getKey()). +setTopicId(entry.getValue()). +setErrorCode(error.error().code()). +setErrorMessage(error.message())); +} +return new ControllerResult<>(records, result); +} + +ApiError deleteTopic(String name, + Uuid providedId, + List records) { +Uuid realId = topicsByName.get(name); +if (realId == null) { +return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, +"Unable to locate the provided topic name."); +} +if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) { +return new ApiError(UNKNOWN_TOPIC_ID, +"The provided topic ID does not match the provided topic name."); +} +TopicControlInfo topic = topics.get(realId); +if (topic == null) { +return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate topic id."); Review comment: this is out of date This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582256432 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") Review comment: As @ijuma said, in this case the error message gives extra information which would be very helpful to users. If we always wanted the same message, there would be no reason to have the string field in the wire protocol. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582256432 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") Review comment: The error message should reflect the problem, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582255681 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val deleteTopicsRequest = request.body[DeleteTopicsRequest] +val nameToId = new mutable.HashMap[String, Uuid] +deleteTopicsRequest.data().topicNames().iterator().asScala.foreach { + name => nameToId.put(name, Uuid.ZERO_UUID) +} +deleteTopicsRequest.data().topics().iterator().asScala.foreach { + nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId()) +} +val (describable, deletable) = { + if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) { +(nameToId.keySet, nameToId.keySet) + } else { +val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n) +val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DELETE, TOPIC, nameToId.keys)(n => n) +(authorizedDescribeTopics, authorizedDeleteTopics) + } +} +def sendResponse(response: DeleteTopicsResponseData): Unit = { + nameToId.keysIterator.foreach { +name => if (!deletable.contains(name)) { + val result = if (describable.contains(name)) { +new DeletableTopicResult().setName(name).setErrorCode(TOPIC_AUTHORIZATION_FAILED.code) + } else { +new DeletableTopicResult().setName(name).setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code) + } + response.responses().add(result) +} + } + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { +response.setThrottleTimeMs(throttleTimeMs) +new DeleteTopicsResponse(response) + }) +} + val future = controller.deleteTopics( + nameToId.view.filterKeys(deletable.contains(_)).toMap.asJava) Review comment: this has changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582255514 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val deleteTopicsRequest = request.body[DeleteTopicsRequest] +val nameToId = new mutable.HashMap[String, Uuid] +deleteTopicsRequest.data().topicNames().iterator().asScala.foreach { + name => nameToId.put(name, Uuid.ZERO_UUID) +} +deleteTopicsRequest.data().topics().iterator().asScala.foreach { + nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId()) +} +val (describable, deletable) = { + if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) { +(nameToId.keySet, nameToId.keySet) + } else { +val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized( Review comment: thanks, this is a good point. I fixed the code to accept only topic IDs. I also found a few more bugs, and added a unit test for the deletion code in controller apis. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer
hachikuji commented on pull request #10059: URL: https://github.com/apache/kafka/pull/10059#issuecomment-785326212 I think the test failures are caused by this: https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java#L222. We are relying on the reverse DNS to map "127.0.0.1" to "localhost." If we use "localhost" directly, then the tests pass. It seems reasonable to me to change the test case. I cannot think of a reason why this would change the reasoning that is documented under `ChannelBuilderUtils.peerHost`, which seems to apply just as well to SASL_SSL as SSL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10051: Adding documentation for KIP-614
vvcephei commented on pull request #10051: URL: https://github.com/apache/kafka/pull/10051#issuecomment-785322634 Merged to 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #10051: Adding documentation for KIP-614
vvcephei merged pull request #10051: URL: https://github.com/apache/kafka/pull/10051 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10051: Adding documentation for KIP-614
vvcephei commented on pull request #10051: URL: https://github.com/apache/kafka/pull/10051#issuecomment-785321000 Fixed the merge conflict. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12372) Enhance TimestampCoverter to handle multiple timestamp or date fields
[ https://issues.apache.org/jira/browse/KAFKA-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hoa Le updated KAFKA-12372: --- Summary: Enhance TimestampCoverter to handle multiple timestamp or date fields (was: Enhance TimestampCoverter to handle fields schema name to convert to Timestamp or Date) > Enhance TimestampCoverter to handle multiple timestamp or date fields > - > > Key: KAFKA-12372 > URL: https://issues.apache.org/jira/browse/KAFKA-12372 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.7.0 >Reporter: Hoa Le >Priority: Minor > Fix For: 3.0.0 > > > Hi team, > > Our team is having an issue of handling multiple timestamp fields in a kafka > message, so for now if we use the > [converter|https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java] > then we have to add more fields in the config. > We can implement it in a generic way to check if the field.schema().name() is > timestamp or date then we can convert it. > Please let me know if I can contribute to this. > > Thanks, > Hoa. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12372) Enhance TimestampCoverter to handle fields schema name to convert to Timestamp or Date
Hoa Le created KAFKA-12372: -- Summary: Enhance TimestampCoverter to handle fields schema name to convert to Timestamp or Date Key: KAFKA-12372 URL: https://issues.apache.org/jira/browse/KAFKA-12372 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.7.0 Reporter: Hoa Le Fix For: 3.0.0 Hi team, Our team is having an issue of handling multiple timestamp fields in a kafka message, so for now if we use the [converter|https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java] then we have to add more fields in the config. We can implement it in a generic way to check if the field.schema().name() is timestamp or date then we can convert it. Please let me know if I can contribute to this. Thanks, Hoa. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer
hachikuji commented on pull request #10059: URL: https://github.com/apache/kafka/pull/10059#issuecomment-785315990 Hmm.. I may have been too hasty with my approval. We need to investigate the test failures. I've triggered another build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10199: MINOR: Fix security_test system test for Raft case
rondagostino commented on pull request #10199: URL: https://github.com/apache/kafka/pull/10199#issuecomment-785311895 @mumrah I separated out the communication-to-quorum failure cases into a separate test -- so now we confirm that hostname verification failure to both ZK and Raft Controller causes Kafka to be unable to start. I did not add the test where communication to the Raft quorum is okay but the inter-broker protocol is SSL and failing due to hostname verification. Let me know if you think it is necessary and I can add it. @abbccdda I "fixed" the ZooKeeper test as mentioned above -- we can revert if you decide you need to fix something related to the auto topic creation manager instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
hachikuji commented on a change in pull request #10183: URL: https://github.com/apache/kafka/pull/10183#discussion_r582219028 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ## @@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int, } } + def handleDescribeTransactions( +transactionalId: String + ): DescribeTransactionsResponseData.TransactionState = { +val transactionState = new DescribeTransactionsResponseData.TransactionState() + .setTransactionalId(transactionalId) + +if (!isActive.get()) { Review comment: That's a fair point. The difference is that none of the other APIs are batched. I thought it was simpler to let the coordinator API work only with individual transactionalIds and leave the batching aspect to `KafkaApis`. The downside of having this potentially redundant check seems not too bad. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
hachikuji commented on a change in pull request #10183: URL: https://github.com/apache/kafka/pull/10183#discussion_r582213713 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ## @@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int, } } + def handleDescribeTransactions( +transactionalId: String + ): DescribeTransactionsResponseData.TransactionState = { +val transactionState = new DescribeTransactionsResponseData.TransactionState() + .setTransactionalId(transactionalId) + +if (!isActive.get()) { + transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code) +} else if (transactionalId == null || transactionalId.isEmpty) { Review comment: I agree it seems inconsistent. The protocol definition does not allow it to be nullable, but I think I added the check because the method is one step removed from the request. Perhaps I can change it to raise `IllegalArgumentException` if the `transactionalId` is null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate
mjsax commented on a change in pull request #10170: URL: https://github.com/apache/kafka/pull/10170#discussion_r582202700 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1874,6 +1873,85 @@ public void close() {} assertEquals(2, punctuatedWallClockTime.size()); } +@Test +public void shouldPunctuateWithTimestampPreservedInProcessorContext() { +final org.apache.kafka.streams.kstream.TransformerSupplier> punctuateProcessor = +() -> new org.apache.kafka.streams.kstream.Transformer>() { +@Override +public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { +context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value")); +context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, timestamp -> context.forward("key", "value")); +} + +@Override +public KeyValue transform(final Object key, final Object value) { +return null; +} + +@Override +public void close() {} +}; + +final List peekedContextTime = new ArrayList<>(); +final org.apache.kafka.streams.processor.ProcessorSupplier peekProcessor = +() -> new org.apache.kafka.streams.processor.AbstractProcessor() { +@Override +public void process(final Object key, final Object value) { +peekedContextTime.add(context.timestamp()); +} +}; + +internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) +.transform(punctuateProcessor) +.process(peekProcessor); +internalStreamsBuilder.buildAndOptimizeTopology(); + +final long currTime = mockTime.milliseconds(); +final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + +thread.setState(StreamThread.State.STARTING); +thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); +final List assignedPartitions = new ArrayList<>(); + +final Map> activeTasks = new HashMap<>(); + +// assign single partition +assignedPartitions.add(t1p1); +activeTasks.put(task1, Collections.singleton(t1p1)); + +thread.taskManager().handleAssignment(activeTasks, emptyMap()); + +clientSupplier.consumer.assign(assignedPartitions); + clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); +thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); + +thread.runOnce(); +assertEquals(0, peekedContextTime.size()); + +mockTime.sleep(100L); +thread.runOnce(); + +assertEquals(1, peekedContextTime.size()); +assertEquals(currTime + 100L, peekedContextTime.get(0).longValue()); + +clientSupplier.consumer.addRecord(new ConsumerRecord<>( +topic1, +1, +0L, + 100L, + TimestampType.CREATE_TIME, + ConsumerRecord.NULL_CHECKSUM, + "K".getBytes().length, + "V".getBytes().length, + "K".getBytes(), + "V".getBytes())); + +thread.runOnce(); + +assertEquals(2, peekedContextTime.size()); +assertEquals(0L, peekedContextTime.get(1).longValue()); Review comment: So to make sure we actually use "stream-time" we should change the test to actually punctuation twice? Would you mind doing a follow-up PR? Might also be worth not use a different timestamp compare to "wall-clock time" to make sure we don't by accident pass in current wall-clock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate
mjsax commented on a change in pull request #10170: URL: https://github.com/apache/kafka/pull/10170#discussion_r582202700 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1874,6 +1873,85 @@ public void close() {} assertEquals(2, punctuatedWallClockTime.size()); } +@Test +public void shouldPunctuateWithTimestampPreservedInProcessorContext() { +final org.apache.kafka.streams.kstream.TransformerSupplier> punctuateProcessor = +() -> new org.apache.kafka.streams.kstream.Transformer>() { +@Override +public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { +context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value")); +context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, timestamp -> context.forward("key", "value")); +} + +@Override +public KeyValue transform(final Object key, final Object value) { +return null; +} + +@Override +public void close() {} +}; + +final List peekedContextTime = new ArrayList<>(); +final org.apache.kafka.streams.processor.ProcessorSupplier peekProcessor = +() -> new org.apache.kafka.streams.processor.AbstractProcessor() { +@Override +public void process(final Object key, final Object value) { +peekedContextTime.add(context.timestamp()); +} +}; + +internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) +.transform(punctuateProcessor) +.process(peekProcessor); +internalStreamsBuilder.buildAndOptimizeTopology(); + +final long currTime = mockTime.milliseconds(); +final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + +thread.setState(StreamThread.State.STARTING); +thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); +final List assignedPartitions = new ArrayList<>(); + +final Map> activeTasks = new HashMap<>(); + +// assign single partition +assignedPartitions.add(t1p1); +activeTasks.put(task1, Collections.singleton(t1p1)); + +thread.taskManager().handleAssignment(activeTasks, emptyMap()); + +clientSupplier.consumer.assign(assignedPartitions); + clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); +thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); + +thread.runOnce(); +assertEquals(0, peekedContextTime.size()); + +mockTime.sleep(100L); +thread.runOnce(); + +assertEquals(1, peekedContextTime.size()); +assertEquals(currTime + 100L, peekedContextTime.get(0).longValue()); + +clientSupplier.consumer.addRecord(new ConsumerRecord<>( +topic1, +1, +0L, + 100L, + TimestampType.CREATE_TIME, + ConsumerRecord.NULL_CHECKSUM, + "K".getBytes().length, + "V".getBytes().length, + "K".getBytes(), + "V".getBytes())); + +thread.runOnce(); + +assertEquals(2, peekedContextTime.size()); +assertEquals(0L, peekedContextTime.get(1).longValue()); Review comment: So to make sure we actually use "stream-time" we should change the test to actually punctuation twice? Would you mind doing a follow-up PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join
[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290176#comment-17290176 ] Matthias J. Sax commented on KAFKA-10847: - Thanks for the update! {quote}I still need to know how to get the grace period. {quote} I guess we will need to pass it into the `Processor` when creating it during `Topology` build time. {quote}I do not delete any records when emitted. {quote} I assume you refer to the case when computing left/outer join results? I think we might need to delete though, to ensure we don't produce duplicates (eg, when a rebalance happens). {quote}I don't know if bloom filters are active {quote} They should be active – we enable them by default. > Avoid spurious left/outer join results in stream-stream join > - > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sergio Peña >Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12360) Improve documentation of max.task.idle.ms (kafka-streams)
[ https://issues.apache.org/jira/browse/KAFKA-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290164#comment-17290164 ] ASF GitHub Bot commented on KAFKA-12360: mjsax commented on pull request #333: URL: https://github.com/apache/kafka-site/pull/333#issuecomment-785274324 @nicodds Thanks for the PR! We also need a second PR for https://github.com/apache/kafka that is the source of truth for the docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve documentation of max.task.idle.ms (kafka-streams) > - > > Key: KAFKA-12360 > URL: https://issues.apache.org/jira/browse/KAFKA-12360 > Project: Kafka > Issue Type: Improvement > Components: docs, streams >Reporter: Domenico Delle Side >Priority: Minor > Labels: beginner, newbie, trivial > > _max.task.idle.ms_ is an handy way to pause processing in a *_kafka-streams_* > application. This is very useful when you need to join two topics that are > out of sync, i.e when data in a topic may be produced _before_ you receive > join information in the other topic. > In the documentation, however, it is not specified that the value of > _max.task.idle.ms_ *must* be lower than _max.poll.intervall.ms_, otherwise > you'll incur into an endless rebalancing problem. > I think it is better to clearly state this in the documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005)