[GitHub] [kafka] guozhangwang commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts
guozhangwang commented on pull request #8646: URL: https://github.com/apache/kafka/pull/8646#issuecomment-644547455 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts
guozhangwang commented on pull request #8646: URL: https://github.com/apache/kafka/pull/8646#issuecomment-644547384 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts
showuon edited a comment on pull request #8646: URL: https://github.com/apache/kafka/pull/8646#issuecomment-644543006 **JDK 8 and Scala 2.12** failed kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853 **JDK 14 and Scala 2.13** failed org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores --> passed locally kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853 **JDK 11 and Scala 2.13** org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication --> passed locally kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition --> traced in KAFKA-8460 KAFKA-8264 kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts
showuon commented on pull request #8646: URL: https://github.com/apache/kafka/pull/8646#issuecomment-644543006 **JDK 8 and Scala 2.12** failed kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853 **JDK 14 and Scala 2.13** failed org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores --> passed locally kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853 **JDK 11 and Scala 2.13** org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication --> passed locally kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition --> traced in KAFKA-8460. kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8839: MINOR: Documentation for KIP-585
kkonstantine commented on pull request #8839: URL: https://github.com/apache/kafka/pull/8839#issuecomment-644537352 Merged to `trunk` and `2.6` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8859: MINOR: Upgrading jersey and jetty versions
kkonstantine commented on pull request #8859: URL: https://github.com/apache/kafka/pull/8859#issuecomment-644535669 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8859: MINOR: Upgrading jersey and jetty versions
kkonstantine commented on pull request #8859: URL: https://github.com/apache/kafka/pull/8859#issuecomment-644535534 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #8839: MINOR: Documentation for KIP-585
kkonstantine merged pull request #8839: URL: https://github.com/apache/kafka/pull/8839 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection
chia7712 commented on pull request #8853: URL: https://github.com/apache/kafka/pull/8853#issuecomment-644530767 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10168) Rename public StreamsConfig variable
[ https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136298#comment-17136298 ] sats commented on KAFKA-10168: -- Thanks [~bchen225242] for the update. > Rename public StreamsConfig variable > > > Key: KAFKA-10168 > URL: https://issues.apache.org/jira/browse/KAFKA-10168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Trivial > Labels: kip > > All Kafka Streams configuration parameter are exposed via public variables > that all end with `_CONFIG` suffix. However, we added the variable of > `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of > `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. > KIP-626: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r440585314 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -311,37 +317,47 @@ class GroupMetadataManager(brokerId: Int, responseCallback(responseError) } -appendForGroup(group, groupMetadataRecords, putCacheCallback) - +appendForGroup(group, groupMetadataRecords, putCacheCallback, completeDelayedRequests) case None => responseCallback(Errors.NOT_COORDINATOR) -None +Map.empty } } + /** + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete + * delayed requests for those returned partitions. + */ private def appendForGroup(group: GroupMetadata, records: Map[TopicPartition, MemoryRecords], - callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = { + callback: Map[TopicPartition, PartitionResponse] => Unit, + completeDelayedRequests: Boolean): Map[TopicPartition, LeaderHWChange] = { // call replica manager to append the group message replicaManager.appendRecords( timeout = config.offsetCommitTimeoutMs.toLong, requiredAcks = config.offsetCommitRequiredAcks, internalTopicsAllowed = true, origin = AppendOrigin.Coordinator, + completeDelayedRequests = completeDelayedRequests, entriesPerPartition = records, delayedProduceLock = Some(group.lock), responseCallback = callback) } /** * Store offsets by appending it to the replicated log and then inserting to cache + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete + * delayed requests for those returned partitions. */ def storeOffsets(group: GroupMetadata, consumerId: String, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit, + completeDelayedRequests: Boolean, Review comment: nice caching. Most methods don't need this flag. Let me revert them :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10168) Rename public StreamsConfig variable
[ https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136292#comment-17136292 ] Boyang Chen commented on KAFKA-10168: - [~sbellapu] There is already a WIP PR for this change > Rename public StreamsConfig variable > > > Key: KAFKA-10168 > URL: https://issues.apache.org/jira/browse/KAFKA-10168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Trivial > Labels: kip > > All Kafka Streams configuration parameter are exposed via public variables > that all end with `_CONFIG` suffix. However, we added the variable of > `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of > `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. > KIP-626: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136283#comment-17136283 ] Sean Guo commented on KAFKA-10134: -- [~guozhang] Cooperative: {noformat} ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [xxx,xxx,xxx,xxx,xxx,xxx] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 54 default.api.timeout.ms = 6 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = xxx-consumer-group group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 180 max.poll.records = 10 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [org.apache.kafka.clients.consumer.CooperativeStickyAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 3 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 3 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer {noformat} Eager: {noformat} ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [xxx,xxx,xxx,xxx,xxx,xxx] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 54 default.api.timeout.ms = 6 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = xxx-consumer-group group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 180 max.poll.records = 10 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 3 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r440578202 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may Review comment: https://issues.apache.org/jira/browse/KAFKA-10170 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10170) ReplicaManager should be responsible for checking delayed operations after appending to the log.
Chia-Ping Tsai created KAFKA-10170: -- Summary: ReplicaManager should be responsible for checking delayed operations after appending to the log. Key: KAFKA-10170 URL: https://issues.apache.org/jira/browse/KAFKA-10170 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai This issue aims to refactor code to simplify the code of checking delayed operations. This issue is inspired by [~hachikuji] (https://github.com/apache/kafka/pull/8657#discussion_r426943627) {quote} Currently we have a somewhat convoluted model where ReplicaManager creates delayed operations, but we depend on lower level components like Partition to be aware of them and complete them. This breaks encapsulation. Not something we should try to complete in this PR, but as an eventual goal, I think we can consider trying to factor delayed operations out of Partition so that they can be managed by ReplicaManager exclusively. If you assume that is the end state, then we could drop completeDelayedRequests and let ReplicaManager always be responsible for checking delayed operations after appending to the log. Other than ReplicaManager, the only caller of this method is GroupMetadataManager which uses it during offset expiration. I think the only reason we do this is because we didn't want to waste purgatory space. I don't think that's a good enough reason to go outside the normal flow. It would be simpler to follow the same path. Potentially we could make the callback an Option so that we still have a way to avoid polluting the purgatory. {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-644521032 > It will be helpful if you could preserve the commit history in future updates to the PR since that makes it easier to identify the delta changes. my bad :( I'll keep that in mind This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r440576071 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = { val producerId = 1000L val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) - groupCoordinator.groupManager.handleTxnCompletion(producerId, -offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) + val isCommit = random.nextBoolean + try groupCoordinator.groupManager.handleTxnCompletion(producerId, +offsetsPartitions.map(_.partition).toSet, isCommit = isCommit) + catch { +case e: IllegalStateException if isCommit + && e.getMessage.contains("though the offset commit record itself hasn't been appended to the log")=> Review comment: > That seems a bug. The root cause (changed by this PR) is that the "txn initialization" and "txn append" are not executed within same lock. **The test story is shown below.** ```CommitTxnOffsetsOperation``` calls ```GroupMetadata.prepareTxnOffsetCommit``` to add ```CommitRecordMetadataAndOffset(None, offsetAndMetadata)``` to ```pendingTransactionalOffsetCommits``` (this is the link you attached). ```GroupMetadata.completePendingTxnOffsetCommit``` called by ```CompleteTxnOperation``` throws ```IllegalStateException``` if ```CommitRecordMetadataAndOffset.appendedBatchOffset``` is ```None``` (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L664). **Why it does not cause error before?** ```CommitRecordMetadataAndOffset.appendedBatchOffset``` is updated by the callback ```putCacheCallback``` (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407). ```TestReplicManager``` always create ```delayedProduce``` do handle the ```putCacheCallback``` (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala#L188). The condition to complete the ```delayedProduce``` is ```completeAttempts.incrementAndGet() >= 3```. And the condition gets true when call both ```producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)``` and ```tryCompleteDelayedRequests()``` since the former calls ```tryComplete``` two times and another calls ```tryComplete``` once. It means ```putCacheCallback``` is always executed by ```TestReplicManager.appendRecords``` and noted that ```TestReplicManager.appendRecords``` is executed within a group lock (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L738) . In short, txn initialization (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L464) and txn append (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407) are executed with same group lock. Hence, the following execution order is impossible. 1. txn initialization 1. txn completion 1. txn append However, this PR disable to complete delayed requests within group lock held by caller. The ```putCacheCallback``` which used to append txn needs to require group lock again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10168) Rename public StreamsConfig variable
[ https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136266#comment-17136266 ] sats commented on KAFKA-10168: -- Hi [~mjsax], it looks low hanging for me to start contributing to this project, can you please assign to me ? or please help me some tickets which are easy for the starter. Thanks > Rename public StreamsConfig variable > > > Key: KAFKA-10168 > URL: https://issues.apache.org/jira/browse/KAFKA-10168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Trivial > Labels: kip > > All Kafka Streams configuration parameter are exposed via public variables > that all end with `_CONFIG` suffix. However, we added the variable of > `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of > `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. > KIP-626: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted
[ https://issues.apache.org/jira/browse/KAFKA-10169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136255#comment-17136255 ] Sophie Blee-Goldman commented on KAFKA-10169: - I haven't seen anything damning (or even particularly interesting) in the logs so far. The only consistent pattern was that it tended to occur after a #handleAssignment where the doomed task was both a previous and current active task (ie owned before and after the rebalance). But it might just be a coincidence, and/or completely unrelated. I'll keep a look out for new thread deaths > KafkaException: Failing batch since transaction was aborted > --- > > Key: KAFKA-10169 > URL: https://issues.apache.org/jira/browse/KAFKA-10169 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > We've seen the following exception in our eos-beta test application recently: > {code:java} > [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task > 1_2 due to: [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > Exception handler choose to FAIL the processing, no more records would be > sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > Caused by: org.apache.kafka.common.KafkaException: Failing batch since > transaction was aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > ... 3 more > {code} > Somewhat unclear if this is an issue with eos-beta specifically, or just eos > in general. But several threads have died over the course of a few days in > the eos-beta application, while none so far have died on the eos-alpha > application. > It's also unclear (at least to me) whether this is definitely an issue in > Streams or possibly a bug in the producer (or even the broker, although that > seems unlikely) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishbellapu opened a new pull request #8877: KAFKA-9194: Missing documentation for replicaMaxWaitTimeMs config value
satishbellapu opened a new pull request #8877: URL: https://github.com/apache/kafka/pull/8877 Looks it is a typo, the actual key supposed to be this #replicaFetchWaitMaxTimeMs(replica.fetch.wait.max.ms) instead of that the docs have this #replicaMaxWaitTimeMs ### Committer Checklist (excluded from commit message) - [*] Verify design and implementation - [*] Verify test coverage and CI build status - [*] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440555918 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -714,13 +696,20 @@ void shutdown(final boolean clean) { } } -if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { -commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); +try { +if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { +commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); +} +for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) { +final Task task = tasks.get(taskId); +task.postCommit(); +} +} catch (final RuntimeException e) { +firstException.compareAndSet(null, e); Review comment: I see. Then I think it makes sense to always attempt to write the checkpoint/call `postCommit` for a task that was successfully committed, regardless of whether something went wrong during `postCommit` with a different task And I agree, we should not make assumptions about the current code not throwing, unless it's explicitly in the contract of the method that it will never throw (which is not the case for `postCommit` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
mjsax commented on pull request #8856: URL: https://github.com/apache/kafka/pull/8856#issuecomment-644498795 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440555458 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); -final Map> activeTasksToCreate = new HashMap<>(activeTasks); -final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); -final Set tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); -// first rectify all existing tasks final LinkedHashMap taskCloseExceptions = new LinkedHashMap<>(); -final Set tasksToClose = new HashSet<>(); -final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); -final Set additionalTasksForCommitting = new HashSet<>(); +final Map> activeTasksToCreate = new HashMap<>(activeTasks); +final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); +final LinkedList tasksToClose = new LinkedList<>(); +final Set tasksToRecycle = new HashSet<>(); final Set dirtyTasks = new HashSet<>(); +// first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); -if (task.commitNeeded()) { -additionalTasksForCommitting.add(task); -} activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); -// check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { +// check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { -try { -task.suspend(); -final Map committableOffsets = task.prepareCommit(); - -tasksToClose.add(task); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); -} -} catch (final RuntimeException e) { -final String uncleanMessage = String.format( -"Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", -task.id()); -log.error(uncleanMessage, e); -taskCloseExceptions.put(task.id(), e); -// We've already recorded the exception (which is the point of clean). -// Now, we should go ahead and complete the close because a half-closed task is no good to anyone. -dirtyTasks.add(task); -} +tasksToClose.add(task); } } -if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { +for (final Task task : tasksToClose) { try { -for (final Task task : additionalTasksForCommitting) { -final Map committableOffsets = task.prepareCommit(); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); +task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: `postCommit` will always write the checkpoint if the task is in SUSPENDED, which it should always be before being closed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440555142 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); -final Map> activeTasksToCreate = new HashMap<>(activeTasks); -final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); -final Set tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); -// first rectify all existing tasks final LinkedHashMap taskCloseExceptions = new LinkedHashMap<>(); -final Set tasksToClose = new HashSet<>(); -final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); -final Set additionalTasksForCommitting = new HashSet<>(); +final Map> activeTasksToCreate = new HashMap<>(activeTasks); +final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); +final LinkedList tasksToClose = new LinkedList<>(); +final Set tasksToRecycle = new HashSet<>(); final Set dirtyTasks = new HashSet<>(); +// first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); -if (task.commitNeeded()) { -additionalTasksForCommitting.add(task); -} activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); -// check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { +// check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { -try { -task.suspend(); -final Map committableOffsets = task.prepareCommit(); - -tasksToClose.add(task); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); -} -} catch (final RuntimeException e) { -final String uncleanMessage = String.format( -"Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", -task.id()); -log.error(uncleanMessage, e); -taskCloseExceptions.put(task.id(), e); -// We've already recorded the exception (which is the point of clean). -// Now, we should go ahead and complete the close because a half-closed task is no good to anyone. -dirtyTasks.add(task); -} +tasksToClose.add(task); } } -if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { +for (final Task task : tasksToClose) { try { -for (final Task task : additionalTasksForCommitting) { -final Map committableOffsets = task.prepareCommit(); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); +task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: `postCommit` only writes a checkpoint for non-eos. Thus, we still need to write a checkpoint in `close()` for the eos-case (if just blindly for all cases as we do atm). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440555142 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); -final Map> activeTasksToCreate = new HashMap<>(activeTasks); -final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); -final Set tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); -// first rectify all existing tasks final LinkedHashMap taskCloseExceptions = new LinkedHashMap<>(); -final Set tasksToClose = new HashSet<>(); -final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); -final Set additionalTasksForCommitting = new HashSet<>(); +final Map> activeTasksToCreate = new HashMap<>(activeTasks); +final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); +final LinkedList tasksToClose = new LinkedList<>(); +final Set tasksToRecycle = new HashSet<>(); final Set dirtyTasks = new HashSet<>(); +// first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); -if (task.commitNeeded()) { -additionalTasksForCommitting.add(task); -} activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); -// check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { +// check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { -try { -task.suspend(); -final Map committableOffsets = task.prepareCommit(); - -tasksToClose.add(task); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); -} -} catch (final RuntimeException e) { -final String uncleanMessage = String.format( -"Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", -task.id()); -log.error(uncleanMessage, e); -taskCloseExceptions.put(task.id(), e); -// We've already recorded the exception (which is the point of clean). -// Now, we should go ahead and complete the close because a half-closed task is no good to anyone. -dirtyTasks.add(task); -} +tasksToClose.add(task); } } -if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { +for (final Task task : tasksToClose) { try { -for (final Task task : additionalTasksForCommitting) { -final Map committableOffsets = task.prepareCommit(); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); +task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: `postCommit` only write a checkpoint for non-eos. Thus, we still need to write a checkpoint for eos case in `close()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440554249 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -714,13 +696,20 @@ void shutdown(final boolean clean) { } } -if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { -commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); +try { +if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { +commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); +} +for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) { +final Task task = tasks.get(taskId); +task.postCommit(); +} +} catch (final RuntimeException e) { +firstException.compareAndSet(null, e); Review comment: I meant the later. And I agree that if `commit` fails, we should not call `postCommit()`. For failure in `postCommit`: we make assumptions about the current code what seems dangerous (ie, not future prove)? -- IMHO, if `postCommit` fails, we need to close the corresponding task dirty and either recreate it, or rebalance, but we should also continue to call `postCommit()` for all other tasks? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted
Sophie Blee-Goldman created KAFKA-10169: --- Summary: KafkaException: Failing batch since transaction was aborted Key: KAFKA-10169 URL: https://issues.apache.org/jira/browse/KAFKA-10169 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Fix For: 2.6.0 We've seen the following exception in our eos-beta test application recently: {code:java} [2020-06-13T00:09:14-07:00] (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task 1_2 due to: [2020-06-13T00:09:14-07:00] (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted [2020-06-13T00:09:14-07:00] (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) Exception handler choose to FAIL the processing, no more records would be sent. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) Caused by: org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) ... 3 more {code} Somewhat unclear if this is an issue with eos-beta specifically, or just eos in general. But several threads have died over the course of a few days in the eos-beta application, while none so far have died on the eos-alpha application. It's also unclear (at least to me) whether this is definitely an issue in Streams or possibly a bug in the producer (or even the broker, although that seems unlikely) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10168) Rename public StreamsConfig variable
[ https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10168: Description: All Kafka Streams configuration parameter are exposed via public variables that all end with `_CONFIG` suffix. However, we added the variable of `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. KIP-626: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name] was: All Kafka Streams configuration parameter are exposed via public variables that all end with `_CONFIG` suffix. However, we added the variable of `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. KIP-629: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Rename+StreamsConfig+config+variable+name] > Rename public StreamsConfig variable > > > Key: KAFKA-10168 > URL: https://issues.apache.org/jira/browse/KAFKA-10168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Trivial > Labels: kip > > All Kafka Streams configuration parameter are exposed via public variables > that all end with `_CONFIG` suffix. However, we added the variable of > `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of > `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. > KIP-626: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10168) Rename public StreamsConfig variable
[ https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10168: Description: All Kafka Streams configuration parameter are exposed via public variables that all end with `_CONFIG` suffix. However, we added the variable of `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. KIP-629: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Rename+StreamsConfig+config+variable+name] was:All Kafka Streams configuration parameter are exposed via public variables that all end with `_CONFIG` suffix. However, we added the variable of `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. > Rename public StreamsConfig variable > > > Key: KAFKA-10168 > URL: https://issues.apache.org/jira/browse/KAFKA-10168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Trivial > Labels: kip > > All Kafka Streams configuration parameter are exposed via public variables > that all end with `_CONFIG` suffix. However, we added the variable of > `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of > `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. > KIP-629: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Rename+StreamsConfig+config+variable+name] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10168) Rename public StreamsConfig variable
Matthias J. Sax created KAFKA-10168: --- Summary: Rename public StreamsConfig variable Key: KAFKA-10168 URL: https://issues.apache.org/jira/browse/KAFKA-10168 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax All Kafka Streams configuration parameter are exposed via public variables that all end with `_CONFIG` suffix. However, we added the variable of `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10168) Rename public StreamsConfig variable
[ https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10168: --- Assignee: Matthias J. Sax > Rename public StreamsConfig variable > > > Key: KAFKA-10168 > URL: https://issues.apache.org/jira/browse/KAFKA-10168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Trivial > Labels: kip > > All Kafka Streams configuration parameter are exposed via public variables > that all end with `_CONFIG` suffix. However, we added the variable of > `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of > `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8865: MINOR: fix StreamsConfig parameter name variable
mjsax commented on pull request #8865: URL: https://github.com/apache/kafka/pull/8865#issuecomment-644485931 Not sure if changing KIP-295 would be a good call. We made a mistake and can be honest about it. I'll do a quick KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8861: MINOR: clean up unused checkstyle suppressions for Streams
mjsax commented on pull request #8861: URL: https://github.com/apache/kafka/pull/8861#issuecomment-644484131 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8871: MINOR: code cleanup for inconsistent naming
mjsax commented on pull request #8871: URL: https://github.com/apache/kafka/pull/8871#issuecomment-644483903 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10062) Add a method to retrieve the current timestamp as known by the Streams app
[ https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136232#comment-17136232 ] Matthias J. Sax commented on KAFKA-10062: - The testing is done using `TopologyTestDriver` (cf [https://kafka.apache.org/25/documentation/streams/developer-guide/testing.html]) – the test driver mocks wall-clock time and allows users to manipulate wall-clock time to control when wall-clock-time-based punctuation should fire. However, the mocked wall-clock time is not exposed via the `context`. Note, that internally, all code alway used `Time` interface if it needs access to system time. In a read deployment this will translate to system time, while the test driver switches the implementation to use `MockTime`. Does this help? > Add a method to retrieve the current timestamp as known by the Streams app > -- > > Key: KAFKA-10062 > URL: https://issues.apache.org/jira/browse/KAFKA-10062 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Piotr Smolinski >Assignee: William Bottrell >Priority: Major > Labels: needs-kip, newbie > > Please add to the ProcessorContext a method to retrieve current timestamp > compatible with Punctuator#punctate(long) method. > Proposal in ProcessorContext: > long getTimestamp(PunctuationType type); > The method should return time value as known by the Punctuator scheduler with > the respective PunctuationType. > The use-case is tracking of a process with timeout-based escalation. > A transformer receives process events and in case of missing an event execute > an action (emit message) after given escalation timeout (several stages). The > initial message may already arrive with reference timestamp in the past and > may trigger different action upon arrival depending on how far in the past it > is. > If the timeout should be computed against some further time only, Punctuator > is perfectly sufficient. The problem is that I have to evaluate the current > time-related state once the message arrives. > I am using wall-clock time. Normally accessing System.currentTimeMillis() is > sufficient, but it breaks in unit testing with TopologyTestDriver, where the > app wall clock time is different from the system-wide one. > To access the mentioned clock I am using reflection to access > ProcessorContextImpl#task and then StreamTask#time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
junrao commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r440539291 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -595,8 +595,10 @@ private[log] class Cleaner(val id: Int, log.replaceSegments(List(cleaned), segments) } catch { case e: LogCleaningAbortedException => -try cleaned.deleteIfExists() -catch { +try { + cleaned.deleteIfExists() + log.producerStateManager.deleteIfExists(cleaned.baseOffset) Review comment: Hmm, the cleaned segment has the same base offset as the first segment. So, we don't want to delete that snapshot file. ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -751,6 +751,25 @@ class ProducerStateManager(val topicPartition: TopicPartition, None } - private def listSnapshotFiles: Seq[File] = ProducerStateManager.listSnapshotFiles(logDir) + private[log] def listSnapshotFiles: Seq[File] = ProducerStateManager.listSnapshotFiles(logDir) + /** + * Remove any producer state snapshot files which do not have a corresponding offset provided + * in keepOffsets. The latest snapshot file will always be kept. Review comment: What's keepOffsets? ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2421,6 +2396,7 @@ class Log(@volatile private var _dir: File, newSegments.foreach { splitSegment => splitSegment.close() splitSegment.deleteIfExists() + producerStateManager.deleteIfExists(splitSegment.baseOffset) Review comment: It doesn't seem that we generate producer snapshot files for those new segments? ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2237,7 +2209,10 @@ class Log(@volatile private var _dir: File, def deleteSegments(): Unit = { info(s"Deleting segments ${segments.mkString(",")}") maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { -segments.foreach(_.deleteIfExists()) +segments.foreach { segment => + segment.deleteIfExists() + producerStateManager.deleteIfExists(segment.baseOffset) Review comment: Hmm, this can be a bit tricky. When we replace old segments with a new segment in LogCleaner, each of the old segment will be deleted. However, the first old segment has the same offset as the new segment. So, we don't want to just delete the producer snapshot corresponding to the first old segment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
andrewchoi5 commented on pull request #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-644482079 Thanks, @junrao . Removed the population of responseMap with the `ERROR` code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10167: --- Assignee: Guozhang Wang > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0 > > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #8876: KAFKA-10167: use the admin client to read end-offset
mjsax commented on a change in pull request #8876: URL: https://github.com/apache/kafka/pull/8876#discussion_r440535836 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java ## @@ -834,6 +855,13 @@ public AlterClientQuotasResult alterClientQuotas(Collection newOffsets) { +beginningOffsets.putAll(newOffsets); +} +public synchronized void updateEndOffsets(final Map newOffsets) { Review comment: nit: empty line. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -564,8 +576,15 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) { return Collections.emptyMap(); try { -return restoreConsumer.endOffsets(partitions); -} catch (final TimeoutException e) { +if (adminClient != null) { Review comment: Why do we need this distinction? Seems we set `adminClient` in any case and it should never be `null`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -273,13 +273,13 @@ public boolean isRunning() { private volatile ThreadMetadata threadMetadata; private StreamThread.StateListener stateListener; -private final Admin adminClient; private final ChangelogReader changelogReader; // package-private for testing final ConsumerRebalanceListener rebalanceListener; final Consumer mainConsumer; final Consumer restoreConsumer; +final Admin adminClient; Review comment: Why is `adminClient` not final any longer? We still pass it into the `StreamThread` constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
junrao commented on pull request #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-644476784 @andrewchoi5 : Since the controller only checks KAFKA_STORAGE_ERROR in LeaderAndIsrResponse now, perhaps we can just log an error without sending an error code back for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10167: Priority: Blocker (was: Major) > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Blocker > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10167: Affects Version/s: 2.6.0 > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0 > > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10167: Fix Version/s: 2.6.0 > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0 > > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10167: Component/s: streams > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Major > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang opened a new pull request #8876: KAFKA-10167: use the admin client to read end-offset
guozhangwang opened a new pull request #8876: URL: https://github.com/apache/kafka/pull/8876 Since admin client allows use to use flexible offset-spec, we can always set to use read-uncommitted regardless of the eos config. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136216#comment-17136216 ] Guozhang Wang commented on KAFKA-10167: --- I've also thought about whether we need to block on mainConsumer#committed before getting the end-offset, and now I think that is not necessary either since the end-offset only requires that data is flushed --- again, remember we have this single-writer single-reader scenario and when we are in the initialize-changelog-reader phase, we know that the other old producer would not be able to write any more unabortable data to that partition. > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Priority: Major > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136215#comment-17136215 ] Guozhang Wang commented on KAFKA-10167: --- The proposed solution is that even under EOS, do not try to use consumer.endOffset that would set `read-committed` flag, but to just use list-offset with `read-uncommitted` to get the end-offset. The rationale is that, since we know that this changelog-topic is a single-writer, single-reader, and we control all the writer / reader of it, we can safely assume that the on-going txn is only from our previous writer. If the task migration is due to a graceful rebalance (i.e. the task is indeed being revoked from the other host), then the old host would always commit in which it would block on `producer.flush` to make sure all data are written (although by default we do not override replication factor on changelog topics and producer's ack.mode, so if user change the one without the other they may bump into other issues where data are not replicated completely and hence high-watermark returned from list-offset can be smaller). And therefore the end-offset returned would return the actual log-end-offset with or without the txn-marker, either of which is fine. If the task migration is due to an unexpected task migration (i.e. the task was not proactively revoked, the old host may not know it is out of the group or has been crashed), then although not all records sent from the old host are guaranteed to be on the broker and be covered with end-offset, it is fine since these records will be aborted eventually anyways. > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Priority: Major > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
Guozhang Wang created KAFKA-10167: - Summary: Streams EOS-Beta should not try to get end-offsets as read-committed Key: KAFKA-10167 URL: https://issues.apache.org/jira/browse/KAFKA-10167 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang This is a bug discovered with the new EOS protocol (KIP-447), here's the context: In Streams when we are assigned with the new active tasks, we would first try to restore the state from the changelog topic all the way to the log end offset, and then we can transit from the `restoring` to the `running` state to start processing the task. Before KIP-447, the end-offset call is only triggered after we've passed the synchronization barrier at the txn-coordinator which would guarantee that the txn-marker has been sent and received (otherwise we would error with CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker is received, it also means that the marker has been fully replicated, which in turn guarantees that the data written before that marker has been fully replicated. As a result, when we send the list-offset with `read-committed` flag we are guaranteed that the returned offset == LSO == high-watermark. After KIP-447 however, we do not fence on the txn-coordinator but on group-coordinator upon offset-fetch, and the group-coordinator would return the fetching offset right after it has received the replicated the txn-marker sent to it. However, since the txn-marker are sent to different brokers in parallel, and even within the same broker markers of different partitions are appended / replicated independently as well, so when the fetch-offset request returns it is NOT guaranteed that the LSO on other data partitions would have been advanced as well. And hence in that case the `endOffset` call may returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r440514740 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = { val producerId = 1000L val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) - groupCoordinator.groupManager.handleTxnCompletion(producerId, -offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) + val isCommit = random.nextBoolean + try groupCoordinator.groupManager.handleTxnCompletion(producerId, +offsetsPartitions.map(_.partition).toSet, isCommit = isCommit) + catch { +case e: IllegalStateException if isCommit + && e.getMessage.contains("though the offset commit record itself hasn't been appended to the log")=> Review comment: Thanks. I am still not sure that I fully understand this. It seems that by not completing the delayedProduce within the group lock, we are hitting IllegalStateException. That seems a bug. Do you know which code depends on that? It seems that we do hold a group lock when updating the txnOffset. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L462 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r440514740 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = { val producerId = 1000L val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) - groupCoordinator.groupManager.handleTxnCompletion(producerId, -offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) + val isCommit = random.nextBoolean + try groupCoordinator.groupManager.handleTxnCompletion(producerId, +offsetsPartitions.map(_.partition).toSet, isCommit = isCommit) + catch { +case e: IllegalStateException if isCommit + && e.getMessage.contains("though the offset commit record itself hasn't been appended to the log")=> Review comment: Thanks. I am still not sure that I fully understand this. It seems that by not completing the delayedProduce within the group lock, we are hitting IllegalStateException. Do you know which code depends on that? It seems that we do hold a group lock when updating the txnOffset. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L462 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may Review comment: Yes, we can refactor that in a separate PR. Could you file a followup jira for that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440473726 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() { /** * * the following order must be followed: - * 1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed + * 1. commit/checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed Review comment: ack ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); -final Map> activeTasksToCreate = new HashMap<>(activeTasks); -final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); -final Set tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); -// first rectify all existing tasks final LinkedHashMap taskCloseExceptions = new LinkedHashMap<>(); -final Set tasksToClose = new HashSet<>(); -final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); -final Set additionalTasksForCommitting = new HashSet<>(); +final Map> activeTasksToCreate = new HashMap<>(activeTasks); +final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); +final LinkedList tasksToClose = new LinkedList<>(); +final Set tasksToRecycle = new HashSet<>(); final Set dirtyTasks = new HashSet<>(); +// first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); -if (task.commitNeeded()) { -additionalTasksForCommitting.add(task); -} activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); -// check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { +// check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { -try { -task.suspend(); -final Map committableOffsets = task.prepareCommit(); - -tasksToClose.add(task); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); -} -} catch (final RuntimeException e) { -final String uncleanMessage = String.format( -"Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", -task.id()); -log.error(uncleanMessage, e); -taskCloseExceptions.put(task.id(), e); -// We've already recorded the exception (which is the point of clean). -// Now, we should go ahead and complete the close because a half-closed task is no good to anyone. -dirtyTasks.add(task); -} +tasksToClose.add(task); } } -if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { +for (final Task task : tasksToClose) { try { -for (final Task task : additionalTasksForCommitting) { -final Map committableOffsets = task.prepareCommit(); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); +task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: 1. I think you're right, we don't need to keep track of the current `checkpoint` offsets at all and can just write the current `checkpointableOffsets` in `postCommit` 2. done This is an automated message from the Apache Git Service. To respond to
[jira] [Commented] (KAFKA-10062) Add a method to retrieve the current timestamp as known by the Streams app
[ https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136199#comment-17136199 ] William Bottrell commented on KAFKA-10062: -- Hi, [~psmolinski]. I'm looking into this request and I would appreciate your help understanding your use case better. Could you give me some more details on how you are currently doing your testing, so I could reproduce and see exactly what you mean? Thanks! Please forgive me for not understanding, I'm extremely new to Kafka. > Add a method to retrieve the current timestamp as known by the Streams app > -- > > Key: KAFKA-10062 > URL: https://issues.apache.org/jira/browse/KAFKA-10062 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Piotr Smolinski >Assignee: William Bottrell >Priority: Major > Labels: needs-kip, newbie > > Please add to the ProcessorContext a method to retrieve current timestamp > compatible with Punctuator#punctate(long) method. > Proposal in ProcessorContext: > long getTimestamp(PunctuationType type); > The method should return time value as known by the Punctuator scheduler with > the respective PunctuationType. > The use-case is tracking of a process with timeout-based escalation. > A transformer receives process events and in case of missing an event execute > an action (emit message) after given escalation timeout (several stages). The > initial message may already arrive with reference timestamp in the past and > may trigger different action upon arrival depending on how far in the past it > is. > If the timeout should be computed against some further time only, Punctuator > is perfectly sufficient. The problem is that I have to evaluate the current > time-related state once the message arrives. > I am using wall-clock time. Normally accessing System.currentTimeMillis() is > sufficient, but it breaks in unit testing with TopologyTestDriver, where the > app wall clock time is different from the system-wide one. > To access the mentioned clock I am using reflection to access > ProcessorContextImpl#task and then StreamTask#time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure
michael-carter-instaclustr commented on pull request #8844: URL: https://github.com/apache/kafka/pull/8844#issuecomment-63698 Failing unit test kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles is unrelated to this pull request and is tracked by: https://issues.apache.org/jira/browse/KAFKA-10155 https://issues.apache.org/jira/browse/KAFKA-10147 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] benhannel edited a comment on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted
benhannel edited a comment on pull request #4204: URL: https://github.com/apache/kafka/pull/4204#issuecomment-644416082 Perfect is the enemy of good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] benhannel commented on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted
benhannel commented on pull request #4204: URL: https://github.com/apache/kafka/pull/4204#issuecomment-644416082 Perfect is the enemy of good. This issue has been unresolved for years because of idealism. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman closed pull request #8868: Revert "KAFKA-9983: KIP-613: add INFO level e2e latency metrics (#8697)"
ableegoldman closed pull request #8868: URL: https://github.com/apache/kafka/pull/8868 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8839: KIP-585: Documentation
rhauch commented on pull request #8839: URL: https://github.com/apache/kafka/pull/8839#issuecomment-644382512 @tombentley, yes we'll want to merge this and backport to the `2.6` branch. That branch is not yet frozen for documentation or tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomasrockhu closed pull request #8875: Update to run coverage on Jenkins
thomasrockhu closed pull request #8875: URL: https://github.com/apache/kafka/pull/8875 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager
guozhangwang commented on pull request #8874: URL: https://github.com/apache/kafka/pull/8874#issuecomment-644329675 To 2.6 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager
guozhangwang merged pull request #8874: URL: https://github.com/apache/kafka/pull/8874 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager
guozhangwang commented on pull request #8874: URL: https://github.com/apache/kafka/pull/8874#issuecomment-644329167 Ran checkstyle and spotbug locally and looks good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomasrockhu opened a new pull request #8875: Update to run coverage on Jenkins
thomasrockhu opened a new pull request #8875: URL: https://github.com/apache/kafka/pull/8875 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager
mjsax commented on pull request #8874: URL: https://github.com/apache/kafka/pull/8874#issuecomment-644328634 Thanks! Was just about to do a hotfix PR myself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in testing
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10166: Description: As the title indicates, long-running test applications with injected network "outages" seem to hit TaskCorruptedException more than expected. Seen occasionally on the ALOS application (~20 times in two days in one case, for example), and very frequently with EOS (many times per day) was:As the title indicates. Seen occasionally with ALOS (~20 times in two days in one case, for example), and very frequently with EOS (many times per day) > Excessive TaskCorruptedException seen in testing > > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates, long-running test applications with injected network > "outages" seem to hit TaskCorruptedException more than expected. > Seen occasionally on the ALOS application (~20 times in two days in one case, > for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10166: Description: As the title indicates. Seen occasionally with ALOS (~20 times in two days in one case, for example), and very frequently with EOS (many times per day) (was: As the title indicates. Seen occasionally in the ALOS (~20 times in two days on one soak), and very frequently in the EOS (many times per day)) > Excessive TaskCorruptedException seen in test applications > -- > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates. Seen occasionally with ALOS (~20 times in two days in > one case, for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10166: --- Assignee: (was: Sophie Blee-Goldman) > Excessive TaskCorruptedException seen in test applications > -- > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates. Seen occasionally with ALOS (~20 times in two days in > one case, for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in testing
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10166: Summary: Excessive TaskCorruptedException seen in testing (was: Excessive TaskCorruptedException seen in test applications) > Excessive TaskCorruptedException seen in testing > > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates. Seen occasionally with ALOS (~20 times in two days in > one case, for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10166: --- Assignee: Sophie Blee-Goldman > Excessive TaskCorruptedException seen in test applications > -- > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates. Seen occasionally with ALOS (~20 times in two days in > one case, for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10166: Summary: Excessive TaskCorruptedException seen in test applications (was: Excessive TaskCorruptedException seen in soak) > Excessive TaskCorruptedException seen in test applications > -- > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates. Seen occasionally in the ALOS (~20 times in two days > on one soak), and very frequently in the EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8855: DO NOT MERGE
mjsax commented on pull request #8855: URL: https://github.com/apache/kafka/pull/8855#issuecomment-644312790 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10166) Excessive TaskCorruptedException seen in soak
Sophie Blee-Goldman created KAFKA-10166: --- Summary: Excessive TaskCorruptedException seen in soak Key: KAFKA-10166 URL: https://issues.apache.org/jira/browse/KAFKA-10166 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Fix For: 2.6.0 As the title indicates. Seen occasionally in the ALOS (~20 times in two days on one soak), and very frequently in the EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager
chia7712 commented on pull request #8874: URL: https://github.com/apache/kafka/pull/8874#issuecomment-644293179 @cadonna @guozhangwang Could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager
[jira] [Commented] (KAFKA-9681) Change whitelist/blacklist terms
[ https://issues.apache.org/jira/browse/KAFKA-9681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136084#comment-17136084 ] sats commented on KAFKA-9681: - This Requires a KIP. > Change whitelist/blacklist terms > > > Key: KAFKA-9681 > URL: https://issues.apache.org/jira/browse/KAFKA-9681 > Project: Kafka > Issue Type: Wish > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Gérald Quintana >Priority: Major > > The whitelist/blacklist terms are not very inclusive, and can be perceived as > racist. > Using allow/deny or include/exclude for example is more neutral -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604
[ https://issues.apache.org/jira/browse/KAFKA-10104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136078#comment-17136078 ] Matthias J. Sax commented on KAFKA-10104: - It's for 3.0 (or maybe even 4.0), as removing stuff is a breaking change. Not sure which release [~cmccabe] had in mind. > Remove deprecated --zookeeper flags as specified in KIP-604 > --- > > Key: KAFKA-10104 > URL: https://issues.apache.org/jira/browse/KAFKA-10104 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > > Remove deprecated --zookeeper flags as specified in KIP-604 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #8872: Fix log message for transition from standby to active
guozhangwang merged pull request #8872: URL: https://github.com/apache/kafka/pull/8872 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8872: Fix log message for transition from standby to active
guozhangwang commented on pull request #8872: URL: https://github.com/apache/kafka/pull/8872#issuecomment-644281735 Cherry-picked to 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8872: Fix log message for transition from standby to active
guozhangwang commented on a change in pull request #8872: URL: https://github.com/apache/kafka/pull/8872#discussion_r440348339 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -509,11 +509,12 @@ void transitionTaskType(final TaskType newType, final LogContext logContext) { throw new IllegalStateException("Tried to recycle state for task type conversion but new type was the same."); } +TaskType oldType = taskType; taskType = newType; log = logContext.logger(ProcessorStateManager.class); logPrefix = logContext.logPrefix(); -log.debug("Transitioning state manager for {} task {} to {}", taskType, taskId, newType); +log.debug("Transitioning state manager for {} task {} to {}", oldType, taskId, newType); Review comment: I think using oldType/newType is fine here since then we do not need to keep in mind of the ordering of each call? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8873: Avoid WARN log message when re-init from checkpoint skipped
guozhangwang merged pull request #8873: URL: https://github.com/apache/kafka/pull/8873 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8873: Avoid WARN log message when re-init from checkpoint skipped
guozhangwang commented on pull request #8873: URL: https://github.com/apache/kafka/pull/8873#issuecomment-644280199 Cherry-picked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136069#comment-17136069 ] Guozhang Wang commented on KAFKA-10134: --- This is a bit weird to me -- discover of coordinator logic did not change from 2.4 -> 2.5 AFAIK. [~seanguo] could you list the configs of consumer when you used cooperative rebalance, v.s. eager rebalance? > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Priority: Major > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dongjinleekr commented on pull request #8117: KAFKA-8403: Suppress needs a Materialized variant
dongjinleekr commented on pull request #8117: URL: https://github.com/apache/kafka/pull/8117#issuecomment-644277987 @vvcephei Here is the fix. The implementation you saw was the draft before the discussion - Now it is updated, with rebasing onto the latest trunk. Don't hesitate to talk to me if you think we need additional tests like a queriable store. Thanks again for your detailed review! :smile: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10165) Percentiles metric leaking memory
Sophie Blee-Goldman created KAFKA-10165: --- Summary: Percentiles metric leaking memory Key: KAFKA-10165 URL: https://issues.apache.org/jira/browse/KAFKA-10165 Project: Kafka Issue Type: Bug Components: metrics, streams Reporter: Sophie Blee-Goldman Assignee: Sophie Blee-Goldman Fix For: 2.6.0 We've hit several OOM in our soak cluster lately. We were finally able to get a heap dump right after the OOM, and found over 3.5 GB of memory being retained by the percentiles (or specifically by the 1MB float[] used by the percentiles). The leak does seem specific to the Percentiles class, as we see ~3000 instances of the Percentiles object vs only ~500 instances of the Max object, which is also used in the same sensor as the Percentiles We did recently lower the size from 1MB to 100kB, but it's clear there is a leak of some kind and a "smaller leak" is not an acceptable solution. If the cause fo the leak is not immediately obvious we should just revert the percentiles in 2.6 and work on stabilizing them for 2.7 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories
[ https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-8362: Component/s: jbod > LogCleaner gets stuck after partition move between log directories > -- > > Key: KAFKA-8362 > URL: https://issues.apache.org/jira/browse/KAFKA-8362 > Project: Kafka > Issue Type: Bug > Components: jbod, log cleaner >Reporter: Julio Ng >Priority: Major > > When a partition is moved from one directory to another, their checkpoint > entry in cleaner-offset-checkpoint file is not removed from the source > directory. > As a consequence when we read the last firstDirtyOffset, we might get a stale > value from the old checkpoint file. > Basically, we need clean up the entry from the check point file in the source > directory when the move is completed > The current issue is that the code in LogCleanerManager: > {noformat} > /** > * @return the position processed for all logs. > */ > def allCleanerCheckpoints: Map[TopicPartition, Long] = { > inLock(lock) { > checkpoints.values.flatMap(checkpoint => { > try { > checkpoint.read() > } catch { > case e: KafkaStorageException => > error(s"Failed to access checkpoint file ${checkpoint.file.getName} > in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) > Map.empty[TopicPartition, Long] > } > }).toMap > } > }{noformat} > collapses the offsets when multiple entries exist for the topicPartition -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9989) Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9989: --- Description: After some discussion we believe there is room to improve the accuracy of this test, by enforcing the wait for a stable rebalance result after KIP-441. === System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and wait for the finalized assignment (non-empty) before kicking off the record processing validation. was: System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and wait for the finalized assignment (non-empty) before kicking off the record processing validation. > Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade > > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > Labels: newbie > Attachments: 166.tgz > > > After some discussion we believe there is room to improve the accuracy of > this test, by enforcing the wait for a stable rebalance result after KIP-441. > === > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment (non-empty) before > kicking off the record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9989) Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9989: --- Summary: Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade (was: StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task) > Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade > > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > Labels: newbie > Attachments: 166.tgz > > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and wait for the finalized assignment (non-empty) before > kicking off the record processing validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving
[ https://issues.apache.org/jira/browse/KAFKA-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136012#comment-17136012 ] Sophie Blee-Goldman commented on KAFKA-10105: - Did you upgrade the clients as well (or at all), or just the cluster? What version were the consumers on? > Regression in group coordinator dealing with flaky clients joining while > leaving > > > Key: KAFKA-10105 > URL: https://issues.apache.org/jira/browse/KAFKA-10105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.1 > Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker > Kafka 2.4.1 on jre 11 on debian 9 in docker >Reporter: William Reynolds >Priority: Major > > Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals > correctly with a consumer sending a join after a leave correctly. > What happens no is that if a consumer sends a leaving then follows up by > trying to send a join again as it is shutting down the group coordinator adds > the leaving member to the group but never seems to heartbeat that member. > Since the consumer is then gone when it joins again after starting it is > added as a new member but the zombie member is there and is included in the > partition assignment which means that those partitions never get consumed > from. What can also happen is that one of the zombies gets group leader so > rebalance gets stuck forever and the group is entirely blocked. > I have not been able to track down where this got introduced between 1.1.0 > and 2.4.1 but I will look further into this. Unfortunately the logs are > essentially silent about the zombie mebers and I only had INFO level logging > on during the issue and by stopping all the consumers in the group and > restarting the broker coordinating that group we could get back to a working > state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts
guozhangwang commented on pull request #8646: URL: https://github.com/apache/kafka/pull/8646#issuecomment-644247677 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8872: Fix log message for transition from standby to active
ableegoldman commented on a change in pull request #8872: URL: https://github.com/apache/kafka/pull/8872#discussion_r440299033 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -509,11 +509,12 @@ void transitionTaskType(final TaskType newType, final LogContext logContext) { throw new IllegalStateException("Tried to recycle state for task type conversion but new type was the same."); } +TaskType oldType = taskType; taskType = newType; log = logContext.logger(ProcessorStateManager.class); logPrefix = logContext.logPrefix(); -log.debug("Transitioning state manager for {} task {} to {}", taskType, taskId, newType); +log.debug("Transitioning state manager for {} task {} to {}", oldType, taskId, newType); Review comment: I agree it would be useful to prefix the logs with `standby` or `active`, but I'd prefer to do that everywhere in a separate PR. Can we just move the log message to before we reassign `taskType = newType`? Or do you think we might forget/not notice that the ordering is relevant and accidentally move it back at some future time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8865: MINOR: fix StreamsConfig parameter name variable
vvcephei commented on pull request #8865: URL: https://github.com/apache/kafka/pull/8865#issuecomment-644211672 Looks like a compile warning: ``` 00:07:57.788 > Task :streams:compileJava 00:07:57.788 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:530: warning: [dep-ann] deprecated item is not annotated with @Deprecated 00:07:57.788 public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization"; 00:07:57.789^ 00:07:57.789 error: warnings found and -Werror specified ``` I'm +1 on this change, and I think it would be fine to just amend KIP-295. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
tombentley commented on pull request #8808: URL: https://github.com/apache/kafka/pull/8808#issuecomment-644211634 @dajac the problem happens when `--command-config` is given on the command line, and the config file defines a `client.id`. The `AppInfoParser` tries to register an MBean whose name is the same as the name used by the existing admin client instance, and the platform MBean server throws the exception given in the issue. Sorry, the initial description in [KAFKA-10109](https://issues.apache.org/jira/browse/KAFKA-10109) omitted this important detail (now fixed). Your point about false positives when verifying the output is correct, that's because it's currently not verifying the output, only the logging. To catch this error by verifying the standard output of the command alone would require changing the `log4j.properties` for all tests in core, which I felt was probably not warranted for this bug alone: I assume they've been set at `ERROR` for a more important reason. Or are you suggesting we use both `grabConsoleOutput()` and `LogCaptureAppender`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10109) kafka-acls.sh/AclCommand opens multiple AdminClients
[ https://issues.apache.org/jira/browse/KAFKA-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley updated KAFKA-10109: Description: {{AclCommand.AclCommandService}} uses {{withAdminClient(opts: AclCommandOptions)(f: Admin => Unit)}} to abstract the execution of an action using an {{AdminClient}} instance. Unfortunately the use of this method in implemeting {{addAcls()}} and {{removeAcls()}} calls {{listAcls()}}. This causes the creation of a second {{AdminClient}} instance. When the {{--command-config}} option has been used to specify a {{client.id}} for the Admin client, the second instance fails to register an MBean, resulting in a warning being logged. {code} ./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config config/broker_connection.conf.reproducing --add --allow-principal User:alice --operation Describe --topic 'test' --resource-pattern-type prefixed Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=PREFIXED)`: (principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW) [2020-06-03 18:43:12,190] WARN Error registering AppInfo mbean (org.apache.kafka.common.utils.AppInfoParser) javax.management.InstanceAlreadyExistsException: kafka.admin.client:type=app-info,id=administrator_data at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) at org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444) at org.apache.kafka.clients.admin.Admin.create(Admin.java:59) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) at kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:105) at kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:146) at kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1(AclCommand.scala:123) at kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1$adapted(AclCommand.scala:116) at kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:108) at kafka.admin.AclCommand$AdminClientService.addAcls(AclCommand.scala:116) at kafka.admin.AclCommand$.main(AclCommand.scala:78) at kafka.admin.AclCommand.main(AclCommand.scala) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=PREFIXED)`: (principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW) {code} was: {{AclCommand.AclCommandService}} uses {{withAdminClient(opts: AclCommandOptions)(f: Admin => Unit)}} to abstract the execution of an action using an {{AdminClient}} instance. Unfortunately the use of this method in implemeting {{addAcls()}} and {{removeAcls()}} calls {{listAcls()}}. This causes the creation of a second {{AdminClient}} instance which then fails to register an MBean, resulting in a warning being logged. {code} ./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config config/broker_connection.conf.reproducing --add --allow-principal User:alice --operation Describe --topic 'test' --resource-pattern-type prefixed Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=PREFIXED)`: (principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW) [2020-06-03 18:43:12,190] WARN Error registering AppInfo mbean (org.apache.kafka.common.utils.AppInfoParser) javax.management.InstanceAlreadyExistsException: kafka.admin.client:type=app-info,id=administrator_data at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbe
[GitHub] [kafka] dajac commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
dajac commented on pull request #8808: URL: https://github.com/apache/kafka/pull/8808#issuecomment-644192842 @tombentley I am not sure to follow what the `AppInfoParser` is doing here... Could you elaborate a bit more? Regarding option 2), I had a quick look at the implementation and it seems that it verifies that no errors are printed out but it does not really verify that the output is correct. For instance, if we would remove the `listAcls(adminClient)` at L115, we would not catch it, isn't it? Perhaps, we could add a small test for the `listAcls()` method which adds few ACLs, graps the output with `TestUtils.grabConsoleOutput` and verifies it. I think that it is probably enough given the scope of the change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8839: KIP-585: Documentation
tombentley commented on pull request #8839: URL: https://github.com/apache/kafka/pull/8839#issuecomment-644176096 @kkonstantine @rhauch any chance this could be merged for 2.6, or is it too late now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest
bbejeck commented on pull request #6229: URL: https://github.com/apache/kafka/pull/6229#issuecomment-644170697 System test failed with `Failed while trying to discover tests: Didn't find any tests for symbol tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py`. @sh-abhi can you take a look? I'll look into this soon as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts
showuon edited a comment on pull request #8646: URL: https://github.com/apache/kafka/pull/8646#issuecomment-644030596 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8831: KAFKA-8657:Client-side automatic topic creation on Producer
abbccdda commented on pull request #8831: URL: https://github.com/apache/kafka/pull/8831#issuecomment-644109922 @jiameixie Let's continue the discussion on mailing thread to get KIP-487 approved first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
dajac commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r440028429 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java ## @@ -572,16 +572,18 @@ public synchronized void removeReporter(MetricsReporter reporter) { } } -synchronized void registerMetric(KafkaMetric metric) { +synchronized void registerMetric(KafkaMetric metric, boolean report) { Review comment: Thinking a bit more about this, did you consider adding the flag to `MetricConfig`? It may be a bit simpler and cleaner as it avoids having to add the flag to all the methods. What do you think? ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def waitForConnectionSlot(listenerName: ListenerName, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - if (!connectionSlotAvailable(listenerName)) { + val startTimeMs = time.milliseconds() + val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0) + + if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) { val startNs = time.nanoseconds +val endThrottleTimeMs = startTimeMs + throttleTimeMs +var remainingThrottleTimeMs = throttleTimeMs do { - counts.wait() -} while (!connectionSlotAvailable(listenerName)) + counts.wait(remainingThrottleTimeMs) Review comment: Thanks for the clarification. I am sorry but I misread the code the first time. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { +val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + +if (protectedListener(listenerName)) { + listenerThrottleTimeMs +} else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) +} + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { +try { + sensor.record(1.0, timeMs) + 0 +} catch { + case e: QuotaViolationException => +val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt +debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") +throttleTimeMs +} + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { +val quotaEntity = listenerOpt.getOrElse("broker") +val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) +sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false) +info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") +sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { +val metric = metrics.metric(connectionRateMetricName((listenerOpt))) +metric.config(rateQuotaMetricConfig(quotaLimit)) +info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { +val quotaEntity = listenerOpt.getOrElse("broker") +metrics.metricName( + s"connection-creation-rate-$quotaEntity", + "connection-quota-no-jmx", + s"Tracking $quota
[GitHub] [kafka] cadonna commented on pull request #8873: Avoid WARN log message when re-init from checkpoint skipped
cadonna commented on pull request #8873: URL: https://github.com/apache/kafka/pull/8873#issuecomment-644109233 Call for review: @ableegoldman @guozhangwang @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #8873: Avoid WARN log message when re-init from checkpoint skipped
cadonna opened a new pull request #8873: URL: https://github.com/apache/kafka/pull/8873 When a re-initialisation from checkpoint is skipped the following log messages appear in the logs. ``` DEBUG stream-thread [EosTest-a2c3b21b-7af1-4dce-a3e0-6dc10932e5a2-StreamThread-1] task [0_2] Skipping re-initialization of offset from checkpoint for recycled store KSTREAM-AGGREGATE-STATE-STORE-03 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) DEBUG stream-thread [EosTest-a2c3b21b-7af1-4dce-a3e0-6dc10932e5a2-StreamThread-1] task [0_2] Skipping re-initialization of offset from checkpoint for recycled store KSTREAM-AGGREGATE-STATE-STORE-07 WARN stream-thread [EosTest-a2c3b21b-7af1-4dce-a3e0-6dc10932e5a2-StreamThread-1] task [0_2] Some loaded checkpoint offsets cannot find their corresponding state stores: {EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-2=1491, EosTest-KSTREAM-AGGREGATE-STATE-STORE-07-changelog-2=1491} ``` The warning appears because the skipped offsets are not removed from the checkpoint. However, there is nothing to warn about, because the offset found there corresponding state stores and they were skipped. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org