[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r580825461 ## File path: config/log4j.properties ## @@ -61,11 +61,11 @@ log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.org.apache.zookeeper=INFO # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) -log4j.logger.kafka=INFO -log4j.logger.org.apache.kafka=INFO +log4j.logger.kafka=TRACE +log4j.logger.org.apache.kafka=TRACE # Change to DEBUG or TRACE to enable request logging -log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.logger.kafka.request.logger=TRACE, requestAppender Review comment: yes, these are accidental. will revert. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy opened a new pull request #10188: MINOR: Update HttpClient to "4.5.13"
omkreddy opened a new pull request #10188: URL: https://github.com/apache/kafka/pull/10188 Update HttpClient to recent bug fix version 4.5.13. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10172: MINOR: add toString to Subscription classes
showuon commented on pull request #10172: URL: https://github.com/apache/kafka/pull/10172#issuecomment-783954552 @chia7712 , thanks for the comments. Please take a look again. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10172: MINOR: add toString to Subscription classes
showuon commented on a change in pull request #10172: URL: https://github.com/apache/kafka/pull/10172#discussion_r580802728 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java ## @@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional groupInstanceId) { public Optional groupInstanceId() { return groupInstanceId; } + +@Override +public String toString() { +return "Subscription(" + +"topics=" + topics + +", userData=" + userData + Review comment: Oh, I didn't notice it! I logged the remaining size of the userData like the `Assignment.toString()` did. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10172: MINOR: add toString to Subscription classes
showuon commented on a change in pull request #10172: URL: https://github.com/apache/kafka/pull/10172#discussion_r580802108 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java ## @@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional groupInstanceId) { public Optional groupInstanceId() { return groupInstanceId; } + +@Override +public String toString() { +return "Subscription(" + +"topics=" + topics + +", userData=" + userData + +", ownedPartitions=" + ownedPartitions + +(groupInstanceId.isPresent() ? ", groupInstanceId=" + groupInstanceId.get() : "") + Review comment: Updated. Thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate
guozhangwang commented on a change in pull request #10170: URL: https://github.com/apache/kafka/pull/10170#discussion_r580793422 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -792,7 +790,16 @@ public void punctuate(final ProcessorNode node, throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix)); } -updateProcessorContext(node, time.milliseconds(), null); +// when punctuating, we need to preserve the timestamp (this can be either system time or event time) +// while other record context are set as dummy: null topic, -1 partition, -1 offset and empty header +final ProcessorRecordContext recordContext = new ProcessorRecordContext( +timestamp, +-1L, +-1, +null, +new RecordHeaders() Review comment: I'm following the existing behavior here: if the record context is `null`, we also return a `return new RecordHeaders();`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10167: KAFKA-12261: Mention about potential delivery loss on partition split when auto.offset.reset = latest
showuon commented on pull request #10167: URL: https://github.com/apache/kafka/pull/10167#issuecomment-783951911 > What do you mean by "splitting partitions"? @ijuma , @ocadaruma is saying that the partition number is increased/decreased. You are right, the phrase **splitting partitions** is not clear. Might be better to change to: `Note that altering partition numbers while setting this config to latest may cause...` What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10167: KAFKA-12261: Mention about potential delivery loss on partition split when auto.offset.reset = latest
ijuma commented on pull request #10167: URL: https://github.com/apache/kafka/pull/10167#issuecomment-783944686 What do you mean by "splitting partitions"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10169: MINOR: Update Scala to 2.13.5
ijuma commented on pull request #10169: URL: https://github.com/apache/kafka/pull/10169#issuecomment-783943805 Merged to trunk and 2.8 branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] modax commented on pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store
modax commented on pull request #9904: URL: https://github.com/apache/kafka/pull/9904#issuecomment-783943018 Hi, Is there any reason this is not in 2.6? 2.6.1 is affected by this problem. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10167: KAFKA-12261: Mention about potential delivery loss on partition split when auto.offset.reset = latest
showuon commented on pull request #10167: URL: https://github.com/apache/kafka/pull/10167#issuecomment-783941027 @ijuma , could you take a look at this PR? Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
chia7712 commented on a change in pull request #10183: URL: https://github.com/apache/kafka/pull/10183#discussion_r580788136 ## File path: clients/src/main/resources/common/message/DescribeProducersResponse.json ## @@ -35,7 +35,7 @@ { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The partition error message, which may be null if no additional details are available" }, { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [ - { "name": "ProducerId", "type": "int64", "versions": "0+" }, + { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" }, { "name": "ProducerEpoch", "type": "int32", "versions": "0+" }, Review comment: The type confuses me. `DescribeProducersResponse` and this protocol use `int64` but other protocols choose `int32`. Also, the type of `TransactionMetadata#producerEpoch` is `short` rather than `integer`. Which one is correct? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10169: MINOR: Update Scala to 2.13.5
ijuma merged pull request #10169: URL: https://github.com/apache/kafka/pull/10169 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API
guozhangwang commented on a change in pull request #10137: URL: https://github.com/apache/kafka/pull/10137#discussion_r580786789 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ## @@ -243,6 +244,11 @@ */ Map endOffsets(Collection partitions, Duration timeout); +/** + * @see KafkaConsumer#currentLag(TopicPartition) + */ +OptionalLong currentLag(TopicPartition topicPartition); Review comment: If we concern that users may call this function too frequent looping a large number of partitions, and each call is synchronizing on the subscription state, then maybe we can make it in a batching mode. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10169: MINOR: Update Scala to 2.13.5
ijuma commented on pull request #10169: URL: https://github.com/apache/kafka/pull/10169#issuecomment-783932706 Unrelated flaky failures: > Build / JDK 15 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch() > Build / JDK 11 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining > This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
chia7712 commented on a change in pull request #10183: URL: https://github.com/apache/kafka/pull/10183#discussion_r580779375 ## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsRequest.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DescribeTransactionsRequestData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class DescribeTransactionsRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +public final DescribeTransactionsRequestData data; + +public Builder(DescribeTransactionsRequestData data) { +super(ApiKeys.DESCRIBE_TRANSACTIONS); +this.data = data; +} + +@Override +public DescribeTransactionsRequest build(short version) { +return new DescribeTransactionsRequest(data, version); +} + +@Override +public String toString() { +return data.toString(); +} +} + +private final DescribeTransactionsRequestData data; + +private DescribeTransactionsRequest(DescribeTransactionsRequestData data, short version) { +super(ApiKeys.DESCRIBE_TRANSACTIONS, version); +this.data = data; +} + +@Override +public DescribeTransactionsRequestData data() { +return data; +} + +@Override +public DescribeTransactionsResponse getErrorResponse(int throttleTimeMs, Throwable e) { +Errors error = Errors.forException(e); +DescribeTransactionsResponseData response = new DescribeTransactionsResponseData(); Review comment: the `throttleTimeMs` is not added to response ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ## @@ -255,6 +258,45 @@ class TransactionCoordinator(brokerId: Int, } } + def handleDescribeTransactions( +transactionalId: String + ): DescribeTransactionsResponseData.TransactionState = { +val transactionState = new DescribeTransactionsResponseData.TransactionState() + .setTransactionalId(transactionalId) + +if (!isActive.get()) { + transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code) +} else if (transactionalId == null || transactionalId.isEmpty) { + transactionState.setErrorCode(Errors.INVALID_REQUEST.code) +} else { + txnManager.getTransactionState(transactionalId) match { +case Left(error) => + transactionState.setErrorCode(error.code) +case Right(None) => + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code) +case Right(Some(coordinatorEpochAndMetadata)) => + val txnMetadata = coordinatorEpochAndMetadata.transactionMetadata + txnMetadata.inLock { +val partitionsByTopic = CollectionUtils.groupPartitionsByTopic(txnMetadata.topicPartitions.asJava) +partitionsByTopic.forEach { (topic, partitions) => + val topicData = new DescribeTransactionsResponseData.TopicData() +.setTopic(topic) +.setPartitions(partitions) + transactionState.topics.add(topicData) +} + +transactionState + .setErrorCode(Errors.NONE.code) + .setProducerId(txnMetadata.producerId) + .setProducerEpoch(txnMetadata.producerEpoch) + .setTransactionState(txnMetadata.state.toString) Review comment: As it is a part of serialized data, should we add constant string to those enums instead of calling `toString`? ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3271,6 +3272,34 @@ class KafkaApis(val requestChannel: RequestChannel, "Apache ZooKeeper mode.") } + def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit = { +val
[GitHub] [kafka] guozhangwang commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API
guozhangwang commented on a change in pull request #10137: URL: https://github.com/apache/kafka/pull/10137#discussion_r580782275 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ## @@ -243,6 +244,11 @@ */ Map endOffsets(Collection partitions, Duration timeout); +/** + * @see KafkaConsumer#currentLag(TopicPartition) + */ +OptionalLong currentLag(TopicPartition topicPartition); Review comment: For API calls that may incur a broker round trip, have batching of partitions makes sense. For this API I think single partition lookup is good enough. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -156,24 +134,24 @@ public boolean readyToProcess(final long wallClockTime) { final TopicPartition partition = entry.getKey(); final RecordQueue queue = entry.getValue(); -final Long nullableFetchedLag = fetchedLags.get(partition); +final OptionalLong fetchedLag = lagProvider.apply(partition); Review comment: Wearing my paranoid hat here: `readyToProcess` is on the critical path, called per record, while we would only update the underlying lag at most as frequent as the consumer poll rate. And in practice we would fall in to the first condition `!queue.isEmpty()` most of the time. On the other hand, `partitionLag` call on `SubscriptionState` is synchronized and could slow down the fetching thread (well, maybe just a bit). So could we call the provider only necessary, i.e. the queue is empty and the lag is either == 0 or not present? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
ijuma commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r580781460 ## File path: config/log4j.properties ## @@ -61,11 +61,11 @@ log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.org.apache.zookeeper=INFO # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) -log4j.logger.kafka=INFO -log4j.logger.org.apache.kafka=INFO +log4j.logger.kafka=TRACE +log4j.logger.org.apache.kafka=TRACE # Change to DEBUG or TRACE to enable request logging -log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.logger.kafka.request.logger=TRACE, requestAppender Review comment: Are these accidental? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10179: MINOR: tune KIP-631 configurations
ijuma commented on pull request #10179: URL: https://github.com/apache/kafka/pull/10179#issuecomment-783923893 @hachikuji can you please review these timeouts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10179: MINOR: tune KIP-631 configurations
cmccabe commented on pull request #10179: URL: https://github.com/apache/kafka/pull/10179#issuecomment-783922535 I added some JavaDoc to the RaftConfig class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default
ijuma commented on pull request #10174: URL: https://github.com/apache/kafka/pull/10174#issuecomment-783892451 Merged to trunk and 2.8 branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default
ijuma merged pull request #10174: URL: https://github.com/apache/kafka/pull/10174 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default
chia7712 commented on pull request #10174: URL: https://github.com/apache/kafka/pull/10174#issuecomment-783879649 > It seems better to aim for ease of use until there's a better way of handling this make sense. be a friendly libraries :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default
ijuma edited a comment on pull request #10174: URL: https://github.com/apache/kafka/pull/10174#issuecomment-783878266 @chia7712 For better or worse, libraries like https://github.com/embeddedkafka/embedded-kafka are often used. And there isn't a great alternative. Getting an exception when there's a mismatch in the patch version of scala-library is pretty surprising. It seems better to aim for ease of use until there's a better way of handling this. Do you agree? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default
ijuma commented on pull request #10174: URL: https://github.com/apache/kafka/pull/10174#issuecomment-783878266 @chia7712 For better or worse, libraries like https://github.com/embeddedkafka/embedded-kafka are often used. And there isn't a great alternative. Getting an exception when there's a mismatch in the patch version of scala-library is pretty surprising. It seems better to aim for ease of use until there's a better way of handling this. Would you agree? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10172: MINOR: add toString to Subscription classes
chia7712 commented on a change in pull request #10172: URL: https://github.com/apache/kafka/pull/10172#discussion_r580767910 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java ## @@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional groupInstanceId) { public Optional groupInstanceId() { return groupInstanceId; } + +@Override +public String toString() { +return "Subscription(" + +"topics=" + topics + +", userData=" + userData + +", ownedPartitions=" + ownedPartitions + +(groupInstanceId.isPresent() ? ", groupInstanceId=" + groupInstanceId.get() : "") + Review comment: How about using lambda? `(groupInstanceId.map(s -> ", groupInstanceId=" + s).orElse(""))` ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java ## @@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional groupInstanceId) { public Optional groupInstanceId() { return groupInstanceId; } + +@Override +public String toString() { +return "Subscription(" + +"topics=" + topics + +", userData=" + userData + Review comment: The type of `userData` is `ByteBuffer` so it makes no sense to print it by `toString`. Maybe we can print the size? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java ## @@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional groupInstanceId) { public Optional groupInstanceId() { return groupInstanceId; } + +@Override +public String toString() { +return "Subscription(" + +"topics=" + topics + +", userData=" + userData + +", ownedPartitions=" + ownedPartitions + +(groupInstanceId.isPresent() ? ", groupInstanceId=" + groupInstanceId.get() : "") + Review comment: For another, it seems to me "null" is more suitable than empty string. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10166: MINOR: update the memberMetadata output
chia7712 merged pull request #10166: URL: https://github.com/apache/kafka/pull/10166 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9912: MINOR: Move `RequestChannel.Response` creation logic into `RequestChannel`
hachikuji merged pull request #9912: URL: https://github.com/apache/kafka/pull/9912 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
hachikuji commented on pull request #10183: URL: https://github.com/apache/kafka/pull/10183#issuecomment-783868752 @dengziming Good call. I've added a few test cases. I think we were missing authorization of the topics included in the response. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
dengziming commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r580761072 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -92,4 +93,13 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial)); } + +/** + * Delete this snapshot from the filesystem. + */ +public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException { +Path path = snapshotPath(logDir, snapshotId); +return Files.deleteIfExists(path); Review comment: This is a good catch, I changed the procedure to rename and async delete. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10146: HOTFIX: fix Scala 2.12 build error caused by ControllerApisTest.scala
chia7712 commented on pull request #10146: URL: https://github.com/apache/kafka/pull/10146#issuecomment-783863300 The 2.8 build with scala 2.12 is broken also so I back port this PR to 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10187: MINOR: document restriction against running multiple Streams apps on same state.dir
ableegoldman commented on pull request #10187: URL: https://github.com/apache/kafka/pull/10187#issuecomment-783862480 Call for review any of @cadonna @lct45 @wcarlson5 @rodesai @agavra This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10187: MINOR: document restriction against running multiple Streams apps on same state.dir
ableegoldman commented on pull request #10187: URL: https://github.com/apache/kafka/pull/10187#issuecomment-783862209 This should be cherrypicked back to the 2.8 branch (it's just docs) -- cc/ 2.8 release manager @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10187: MINOR: document restriction against running multiple Streams apps on same state.dir
ableegoldman opened a new pull request #10187: URL: https://github.com/apache/kafka/pull/10187 Running more than one Streams instance on the same physical state directory has never been supported, but until now it's also not really been enforced. Since we fixed Streams to fail fast in the case of this misconfiguration instead of offering vague symptoms, we will throw an exception on startup if it is detected. We should document this clearly as some users may have been using this setup for testing (note: there's no reason to do this in a real production app, as you should either scale up by adding more threads and/or scale out by adding new instances with their own storage) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API
chia7712 commented on a change in pull request #10137: URL: https://github.com/apache/kafka/pull/10137#discussion_r580741128 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -2219,6 +2221,25 @@ public void resume(Collection partitions) { } } +/** + * Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known, + * for example if there is no position yet, or if the end offset is not known yet. + * + * + * This method uses locally cached metadata and never makes a remote call. + * + * @param topicPartition The partition to get the lag for. + * + * @return This {@code Consumer} instance's current lag for the given partition. + * + * @throws IllegalStateException if the {@code topicPartition} is not assigned + **/ +@Override +public OptionalLong currentLag(TopicPartition topicPartition) { +final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); Review comment: Other methods call `acquireAndEnsureOpen();` first and then call `release()` in the finally block. Should this new method follow same pattern? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax closed pull request #9690: KAFKA-10017: fix flaky EOS-beta upgrade test
mjsax closed pull request #9690: URL: https://github.com/apache/kafka/pull/9690 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9690: KAFKA-10017: fix flaky EOS-beta upgrade test
mjsax commented on pull request #9690: URL: https://github.com/apache/kafka/pull/9690#issuecomment-783848142 As we have a `2.8` release branch now, it seems not to be worth any longer to backport to fix to `2.6` branch. Closing this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10166: MINOR: update the memberMetadata output
showuon commented on pull request #10166: URL: https://github.com/apache/kafka/pull/10166#issuecomment-783847072 @chia7712 , could you have a look at this one line change? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12273) InterBrokerSendThread#pollOnce throws FatalExitError even though it is shutdown correctly
[ https://issues.apache.org/jira/browse/KAFKA-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12273. Resolution: Fixed > InterBrokerSendThread#pollOnce throws FatalExitError even though it is > shutdown correctly > - > > Key: KAFKA-12273 > URL: https://issues.apache.org/jira/browse/KAFKA-12273 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 3.0.0, 2.8.0 > > > kafka tests sometimes shutdown gradle with non-zero code. The (one of) root > cause is that InterBrokerSendThread#pollOnce encounters DisconnectException > when NetworkClient is closing. DisconnectException should be viewed as > "expected" error as we do close it. In other words, > InterBrokerSendThread#pollOnce should swallow it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10024: KAFKA-12273 InterBrokerSendThread#pollOnce throws FatalExitError even…
chia7712 commented on pull request #10024: URL: https://github.com/apache/kafka/pull/10024#issuecomment-783842651 push to trunk and 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10024: KAFKA-12273 InterBrokerSendThread#pollOnce throws FatalExitError even…
chia7712 merged pull request #10024: URL: https://github.com/apache/kafka/pull/10024 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #10186: MINOR: bump release version to 3.0.0-SNAPSHOT
mjsax opened a new pull request #10186: URL: https://github.com/apache/kafka/pull/10186 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]
[ https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-10665. --- Resolution: Fixed > Flaky Test > StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization > = all] > > > Key: KAFKA-10665 > URL: https://issues.apache.org/jira/browse/KAFKA-10665 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > Labels: flaky-test > > {code:java} > java.nio.file.DirectoryNotEmptyException: > /tmp/kafka-13241964730537515637/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_ > at > java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:246) > at > java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105) > at java.base/java.nio.file.Files.delete(Files.java:1146) > at > org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:869) > at > org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:839) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2822) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2876) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:839) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:825) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151) > at > org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122) > {code} > https://github.com/apache/kafka/pull/9515/checks?check_run_id=1333753280 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic
showuon commented on pull request #10185: URL: https://github.com/apache/kafka/pull/10185#issuecomment-783839489 @mimaison , could you help review this PR? Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic
showuon opened a new pull request #10185: URL: https://github.com/apache/kafka/pull/10185 The reason why the test sometimes failed with: `TopicExistsException: Topic 'primary.test-topic-2' already exists.` is because we tried to create the topic that the MM2 already help us created. That is, ```java primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); // after the primary cluster created "test-topic-2" topic, the backup cluster will auto-created the topic: "primary.test-topic-2" for us backup.kafka().createTopic("primary.test-topic-2", 1); // this line will have race condition with the MM2 ``` Fix the issue by explicitly waiting for the MM2 auto-created topic. Also fix 2 issues in the tests: 1. We should handle delete topics exception well 2. We had resource leak due to no close the adminClient ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10180: KAFKA- 12347: expose offsets to streams client (WIP)
mjsax commented on a change in pull request #10180: URL: https://github.com/apache/kafka/pull/10180#discussion_r580731514 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -91,6 +91,7 @@ // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance private final Set lockedTaskDirectories = new HashSet<>(); private java.util.function.Consumer> resetter; +private Map committedOffsets = new HashMap<>(); Review comment: During a rebalance, we should delete all entries for partitions we don't own any longer. Should we also pre-populate this map when we init a task (cf `StreamsTask#initializeMetadata()`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10180: KAFKA- 12347: expose offsets to streams client (WIP)
mjsax commented on a change in pull request #10180: URL: https://github.com/apache/kafka/pull/10180#discussion_r580731029 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -307,6 +308,9 @@ public boolean isRunning() { private final ProcessingMode processingMode; private AtomicBoolean leaveGroupRequested; +private final Map committedOffsets; Review comment: Seems to be unused? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
dengziming commented on a change in pull request #10183: URL: https://github.com/apache/kafka/pull/10183#discussion_r580710886 ## File path: clients/src/main/resources/common/message/DescribeTransactionsResponse.json ## @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 65, + "type": "response", + "name": "DescribeTransactionsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, +"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ +{ "name": "ErrorCode", "type": "int16", "versions": "0+" }, +{ "name": "TransactionalId", "type": "string", "versions": "0+" }, Review comment: we can add `"entityType": "transactionalId"` to this field. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users
showuon commented on pull request #9627: URL: https://github.com/apache/kafka/pull/9627#issuecomment-783805863 @abbccdda @guozhangwang @mjsax , please help review this PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe opened a new pull request #10184: URL: https://github.com/apache/kafka/pull/10184 Enable the new KIP-500 controller to delete topics. Fix a bug where feature level records were not correctly replayed. Fix a bug in TimelineHashMap#remove where the wrong type was being returned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start
hachikuji commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r580705481 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -92,4 +93,13 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws return Optional.of(new SnapshotPath(path, new OffsetAndEpoch(endOffset, epoch), partial)); } + +/** + * Delete this snapshot from the filesystem. + */ +public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) throws IOException { +Path path = snapshotPath(logDir, snapshotId); +return Files.deleteIfExists(path); Review comment: When we remove other log files in `Log`, we first rename them to `{file}.deleted`. Then we schedule them for background deletion. Do you think we should do similarly for snapshot files? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12160) KafkaStreams configs are documented incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-12160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-12160. - Fix Version/s: 2.8.0 Resolution: Fixed > KafkaStreams configs are documented incorrectly > --- > > Key: KAFKA-12160 > URL: https://issues.apache.org/jira/browse/KAFKA-12160 > Project: Kafka > Issue Type: Bug > Components: docs, streams >Reporter: Matthias J. Sax >Assignee: James Galasyn >Priority: Minor > Fix For: 2.8.0 > > > In version 2.3, we removed the KafkaStreams default of `max.poll.interval.ms` > and fall-back to the consumer default. However, the docs still contain > `Integer.MAX_VALUE` as default. > Because we rely on the consumer default, we should actually remove > `max.poll.interval.ms` from the Kafka Streams docs completely. We might want > to fix this is some older versions, too. Not sure how far back we want to go. > Furhtermore, in 2.7 docs, the section of "Default Values" and "Parameters > controlled by Kafka Streams" contain incorrect information. > cf > https://kafka.apache.org/27/documentation/streams/developer-guide/config-streams.html#default-values >  -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #10182: KAFKA-12160: Remove max.poll.interval.ms from config docs (and KSTREAMS-4881)
mjsax commented on pull request #10182: URL: https://github.com/apache/kafka/pull/10182#issuecomment-783792561 Merged to `trunk` and cherry-picked to `2.8` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10182: KAFKA-12160: Remove max.poll.interval.ms from config docs (and KSTREAMS-4881)
mjsax merged pull request #10182: URL: https://github.com/apache/kafka/pull/10182 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API
hachikuji opened a new pull request #10183: URL: https://github.com/apache/kafka/pull/10183 This patch implements the `DescribeTransactions` API as documented in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn opened a new pull request #10182: KAFKA-12160: Remove max.poll.interval.ms from config docs
JimGalasyn opened a new pull request #10182: URL: https://github.com/apache/kafka/pull/10182 [KAFKA-12160](https://issues.apache.org/jira/browse/KAFKA-12160): Remove `max.poll.interval.ms` from the Kafka Streams docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`
hachikuji commented on a change in pull request #9958: URL: https://github.com/apache/kafka/pull/9958#discussion_r580635693 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock { - if (group.is(Dead)) { -// if the group is marked as dead, it means some other thread has just removed the group -// from the coordinator metadata; it is likely that the group has migrated to some other -// coordinator OR the group is in a transient unstable phase. Let the member retry -// finding the correct coordinator and rejoin. -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE }) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) { -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID }) - } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { -// Enforce member id when it is set. -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID }) - } else if (generationId >= 0 && generationId != group.generationId) { -// Enforce generation check when it is set. -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION }) + val validationErrorOpt = validateTxnOffsetCommit( +group, +generationId, +memberId, +groupInstanceId + ) + + if (validationErrorOpt.isDefined) { +responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get }) Review comment: Yeah, I agree. I think I was trying to avoid a big indent of the block below. I found a reasonable way to do it in the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12363) Simplify static group memberId update logic
[ https://issues.apache.org/jira/browse/KAFKA-12363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12363: Description: In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of static members always gets persisted. The way this works is the following: 1. When the JoinGroup is received, we immediately replace the current memberId with the updated memberId. 2. We then send an append to the log to update group metadata 3. If the append succeeds, we return the new memberId in the JoinGroup response. 4. If the append fails, we revert to the old memberId and we return UNKNOWN_MEMBER_ID in the response for the new member. I am not sure if there are any correctness problems with this logic, but it does seem strange. For example, we can end up fencing the old memberId after step 1 even if we end up reverting in step 3. I think it would be simpler to structure this as follows: 1. When the JoinGroup is received, send an append to the log to update group metadata 2. If the append succeeds, replace the existing memberId with the new committed memberId. 3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry. Basically we don't surface the effect of the member replacement until we know it has been committed to the log, which avoids the weird revert logic. was: In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of static members always gets persisted. The way this works is the following: 1. When the JoinGroup is received, we immediately replace the current memberId with the updated memberId. 2. We then send an append to the log to update group metadata 3. If the append is unsuccessful, we revert to the old memberId and we return UNKNOWN_MEMBER_ID in the response for the new member. 4. If the append is successful, we return the new memberId in the JoinGroup response. I am not sure if there are any correctness problems with this logic, but it does seem strange. For example, we can end up fencing the old memberId after step 1 even if we end up reverting in step 3. I think it would be simpler to structure this as follows: 1. When the JoinGroup is received, send an append to the log to update group metadata 2. If the append succeeds, replace the existing memberId with the new committed memberId. 3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry. Basically we don't surface the effect of the member replacement until we know it has been committed to the log, which avoids the weird revert logic. > Simplify static group memberId update logic > --- > > Key: KAFKA-12363 > URL: https://issues.apache.org/jira/browse/KAFKA-12363 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of > static members always gets persisted. The way this works is the following: > 1. When the JoinGroup is received, we immediately replace the current > memberId with the updated memberId. > 2. We then send an append to the log to update group metadata > 3. If the append succeeds, we return the new memberId in the JoinGroup > response. > 4. If the append fails, we revert to the old memberId and we return > UNKNOWN_MEMBER_ID in the response for the new member. > I am not sure if there are any correctness problems with this logic, but it > does seem strange. For example, we can end up fencing the old memberId after > step 1 even if we end up reverting in step 3. I think it would be simpler to > structure this as follows: > 1. When the JoinGroup is received, send an append to the log to update group > metadata > 2. If the append succeeds, replace the existing memberId with the new > committed memberId. > 3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry. > Basically we don't surface the effect of the member replacement until we know > it has been committed to the log, which avoids the weird revert logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`
hachikuji commented on a change in pull request #9958: URL: https://github.com/apache/kafka/pull/9958#discussion_r580672171 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1055,11 +1227,15 @@ class GroupCoordinator(val brokerId: Int, val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { + + // TODO: This logic seems questionable. The write was not committed, but that doesn't + // mean it wasn't written to the log and cannot eventually become committed. Review comment: Done. I filed https://issues.apache.org/jira/browse/KAFKA-12363. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12363) Simplify static group memberId update logic
Jason Gustafson created KAFKA-12363: --- Summary: Simplify static group memberId update logic Key: KAFKA-12363 URL: https://issues.apache.org/jira/browse/KAFKA-12363 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of static members always gets persisted. The way this works is the following: 1. When the JoinGroup is received, we immediately replace the current memberId with the updated memberId. 2. We then send an append to the log to update group metadata 3. If the append is unsuccessful, we revert to the old memberId and we return UNKNOWN_MEMBER_ID in the response for the new member. 4. If the append is successful, we return the new memberId in the JoinGroup response. I am not sure if there are any correctness problems with this logic, but it does seem strange. For example, we can end up fencing the old memberId after step 1 even if we end up reverting in step 3. I think it would be simpler to structure this as follows: 1. When the JoinGroup is received, send an append to the log to update group metadata 2. If the append succeeds, replace the existing memberId with the new committed memberId. 3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry. Basically we don't surface the effect of the member replacement until we know it has been committed to the log, which avoids the weird revert logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #10181: MINOR: fix a couple of failing system tests
cmccabe merged pull request #10181: URL: https://github.com/apache/kafka/pull/10181 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10181: MINOR: fix a couple of failing system tests
rondagostino commented on pull request #10181: URL: https://github.com/apache/kafka/pull/10181#issuecomment-783734434 @cmccabe Can you take a look? I found these issues after the test run on the 2.8 PR branch completed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10181: MINOR: fix a couple of failing system tests
rondagostino opened a new pull request #10181: URL: https://github.com/apache/kafka/pull/10181 This patch fixes a couple of failing system tests due to https://github.com/apache/kafka/pull/10105/ and should be merged to both `trunk` and `2.8` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12362) Determine if a Task is idling
Walker Carlson created KAFKA-12362: -- Summary: Determine if a Task is idling Key: KAFKA-12362 URL: https://issues.apache.org/jira/browse/KAFKA-12362 Project: Kafka Issue Type: Improvement Components: streams Reporter: Walker Carlson Fix For: 3.0.0 determine if a task is idling given the task Id. Â https://github.com/apache/kafka/pull/10180 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 opened a new pull request #10180: KAFKA- 12347: expose offsets to streams client
wcarlson5 opened a new pull request #10180: URL: https://github.com/apache/kafka/pull/10180 collect the offsets after they are committed ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #10179: MINOR: tune KIP-631 configurations
ijuma edited a comment on pull request #10179: URL: https://github.com/apache/kafka/pull/10179#issuecomment-783724833 Can we add a comment to the code explaining why these timeouts are lower than the ZK equivalent ones (similar to what's in the PR description). May even be worth including it in the documentation string (people will wonder). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10179: MINOR: tune KIP-631 configurations
ijuma commented on pull request #10179: URL: https://github.com/apache/kafka/pull/10179#issuecomment-783724833 Can we add a comment to the code explaining why these timeouts are lower than the ZK equivalent ones (similar to what's in the PR description). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10179: MINOR: tune KIP-631 configurations
cmccabe opened a new pull request #10179: URL: https://github.com/apache/kafka/pull/10179 Since we expect KIP-631 controller failovers to be fairly cheap, tune the default raft configuration parameters so that we detect node failures more quickly. Reduce the broker session timeout as well so that broker failures are detected more quickly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`
hachikuji commented on a change in pull request #9958: URL: https://github.com/apache/kafka/pull/9958#discussion_r580635693 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock { - if (group.is(Dead)) { -// if the group is marked as dead, it means some other thread has just removed the group -// from the coordinator metadata; it is likely that the group has migrated to some other -// coordinator OR the group is in a transient unstable phase. Let the member retry -// finding the correct coordinator and rejoin. -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE }) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) { -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID }) - } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { -// Enforce member id when it is set. -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID }) - } else if (generationId >= 0 && generationId != group.generationId) { -// Enforce generation check when it is set. -responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION }) + val validationErrorOpt = validateTxnOffsetCommit( +group, +generationId, +memberId, +groupInstanceId + ) + + if (validationErrorOpt.isDefined) { +responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get }) Review comment: Yeah, I agree. I think I was trying to avoid a big indent. I think I found a reasonable way to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #10175: MINOR: V2.8 system tests for Raft-based metadata quorums
cmccabe closed pull request #10175: URL: https://github.com/apache/kafka/pull/10175 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10175: MINOR: V2.8 system tests for Raft-based metadata quorums
cmccabe commented on pull request #10175: URL: https://github.com/apache/kafka/pull/10175#issuecomment-783709474 thanks, @rondagostino! I backported this as part of the #10105 PR as we usually do (we don't usually have a separate PR for the branch). Thanks for showing me how to resolve the conflicts in `get_offset_shell_test.py`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10105: MINOR: System tests for Raft-based metadata quorums
cmccabe merged pull request #10105: URL: https://github.com/apache/kafka/pull/10105 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12242) Decouple state store materialization enforcement from name/serde provider
[ https://issues.apache.org/jira/browse/KAFKA-12242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288646#comment-17288646 ] John Roesler commented on KAFKA-12242: -- Thanks for raising this, [~guozhang] , I have seen several people getting tripped up by this issue. Â Just to throw it out there, another approach to solve this would be to begin to go down the road I proposed here: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar] Specifically, what I have in mind is that attaching serdes to a processor node is logically independent from requesting materialization of that node's view. This ticket highlights the awkwardness of using a cross-cutting "Materialized" config at all. In contrast, if it were just "MapValuesParameters" with independent settings for specifying serdes and for requesting materialization, it wouldn't be an issue. I definitely don't insist on the proposed grammar, but wanted to document the relationship with that proposal. > Decouple state store materialization enforcement from name/serde provider > - > > Key: KAFKA-12242 > URL: https://issues.apache.org/jira/browse/KAFKA-12242 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > Many users of Streams would want the following: let the Streams runtime to > decide whether or not to materialize a state store; AND if it decides to do > so, use the store name / serdes I provided ahead of time, if not, then > nothing happens (the provided store name and serdes can just be dropped). > However, Streams today take `Materialized` as an indicator to enforce the > materialization. We should think of a way for users to optionally decouple > materialization enforcement from name/serde provider. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper
hachikuji commented on a change in pull request #9912: URL: https://github.com/apache/kafka/pull/9912#discussion_r580606013 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -2127,21 +2118,15 @@ class KafkaApisTest { error = Errors.NONE )) -val response = readResponse(joinGroupRequest, capturedResponse) - .asInstanceOf[JoinGroupResponse] +val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse] assertEquals(Errors.NONE, response.error) assertEquals(0, response.data.members.size) assertEquals(memberId, response.data.memberId) assertEquals(0, response.data.generationId) assertEquals(memberId, response.data.leader) assertEquals(protocolName, response.data.protocolName) - -if (version >= 7) { Review comment: Now these test cases are working with the raw response object returned by `KafkaApis`. This means we don't have to deal with fields which are dropped during serialization. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12347) Improve Kafka Streams ability to track progress
[ https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-12347: --- Description: Add methods to track records being consumed fully and to tell if tasks are idling. This will allow users of streams to build uptime metrics around streams with less difficulty. KIP-715: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams] was: Add metrics to track records being consumed fully and to tell if tasks are idling. This will allow users of streams to build uptime metrics around streams with less difficulty. KIP-715: https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams > Improve Kafka Streams ability to track progress > --- > > Key: KAFKA-12347 > URL: https://issues.apache.org/jira/browse/KAFKA-12347 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: kip > Fix For: 3.0.0 > > > Add methods to track records being consumed fully and to tell if tasks are > idling. This will allow users of streams to build uptime metrics around > streams with less difficulty. > KIP-715: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch commented on a change in pull request #9950: URL: https://github.com/apache/kafka/pull/9950#discussion_r580584221 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java ## @@ -364,11 +365,24 @@ private static String castToString(Object value) { if (value instanceof java.util.Date) { java.util.Date dateValue = (java.util.Date) value; return Values.dateFormatFor(dateValue).format(dateValue); +} else if (value instanceof ByteBuffer) { +ByteBuffer byteBuffer = (ByteBuffer) value; +return castByteArrayToString(byteBuffer.array()); +} else if (value instanceof byte[]) { +return castByteArrayToString((byte[]) value); } else { return value.toString(); } } +private static String castByteArrayToString(byte[] array) { +StringBuilder sbuf = new StringBuilder(); +for (byte b : array) { +sbuf.append(String.format("%02X", b)); Review comment: As noted in my previous comment, I agree with @kkonstantine that base64 would be preferable. Doing that would align better with the existing `Values` class used in Connect's header converter mechanism. If we want to add support for multiple encodings, we would need to have a KIP since it would likely mean changing the SMT configuration. ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java ## @@ -364,11 +365,31 @@ private static String castToString(Object value) { if (value instanceof java.util.Date) { java.util.Date dateValue = (java.util.Date) value; return Values.dateFormatFor(dateValue).format(dateValue); +} else if (value instanceof ByteBuffer) { +ByteBuffer byteBuffer = (ByteBuffer) value; +if (byteBuffer.hasArray()) { +return castByteArrayToString(byteBuffer.array()); +} +else { +byte[] array = new byte[byteBuffer.remaining()]; +byteBuffer.get(array); +return castByteArrayToString(array); +} Review comment: The use of `ByteBuffer.get(...)` here does not account for the fact that it may not be positioned at the beginning. Kafka has two `Utils.readBytes(...)` methods that we should probably use. This code would then simplify to: ```suggestion byte[] rawBytes = Utils.readBytes(byteBuffer); return castByteArrayToString(rawBytes); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10178: KAFKA-12361: Use default request.timeout.ms value for Connect producers
C0urante commented on pull request #10178: URL: https://github.com/apache/kafka/pull/10178#issuecomment-783669441 @hachikuji Does this look alright to you? The changes are trivial but I'm also hoping my summary of the situation in the description is correct and won't mislead anyone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #10178: KAFKA-12361: Use default request.timeout.ms value for Connect producers
C0urante opened a new pull request #10178: URL: https://github.com/apache/kafka/pull/10178 [Jira](https://issues.apache.org/jira/browse/KAFKA-12361) The super-high request timeout makes it harder for the producer to gracefully handle unclean connection terminations, which might happen in the case of sudden broker death. Reducing that value to the default of 30 seconds should address that issue, without compromising the existing delivery guarantees of the Connect framework. Since the delivery timeout is still set to a very-high value, this change shouldn't make it more likely for `Producer::send` to throw an exception and fail the task. This may make duplicate record delivery more likely in cases with extremely-slow broker response time, but that can be addressed by enabling idempotence in the underlying producers for the connector's tasks. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch edited a comment on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-783634110 Before this fix, the changes made in #4820 ([KAFKA-6684](https://issues.apache.org/jira/browse/KAFKA-6684)) resulted in `toString()` being called on `byte[]` and `ByteBuffer`. As highlighted in this PR and issue that `ByteBuffer.toString()` is not useful, but the `toString()` on `byte[]` still works. This PR seems to change that behavior, which would not be backward compatible. The discussion on PR #4820 also talked about making this compatible with `Values.convertToString(...)`, which for `byte[]` and `ByteBuffer` results in a base 64 encoded string (with `ISO-8859-1` encoding). See the [Values code for details](https://github.com/apache/kafka/blob/95f51539c8d0b88bd7f285011d42e2d1117107de/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L673-L682). ~Because of this, it seems much more sensible to also use base64 here for `ByteBuffer` so it matches the existing behavior with `byte[]`.~ Actually, the existing code simply outputs the `toString()` result of a `byte[]` object, which is of the form `[B@22ef9844`, which is the `Object.toString()` implementation that includes the class alias (e.g., `[B` is a byte array) and the hex representation of the object's `hashCode()`. However, given that none of the cast forms of `byte[]` or `ByteBuffer[]` are useful, then perhaps it's worth adding the previous discussion on #4820 mentioning that: > As @ewencp suggested, for consistency perhaps we can use the same string formats as SimpleHeaderConverter: https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java, As mentioned above, the `Values.convertToString(...)` uses a base 64 encoding. And this aligns with @kkonstantine's suggestion: > Why are we selecting hex here and not base64 for example? Hex of course is less efficient. I agree that it maybe doesn't suffice in all user situations, so if we also want to support other encodings we'd need other config changes that will require using the KIP mechanism to propose such enhancements. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch edited a comment on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-783634110 Before this fix, the changes made in #4820 ([KAFKA-6684](https://issues.apache.org/jira/browse/KAFKA-6684)) resulted in `toString()` being called on `byte[]` and `ByteBuffer`. As highlighted in this PR and issue that `ByteBuffer.toString()` is not useful, but the `toString()` on `byte[]` still works. This PR seems to change that behavior, which would not be backward compatible. The discussion on PR #4820 also talked about making this compatible with `Values.convertToString(...)`, which for `byte[]` and `ByteBuffer` results in a base 64 encoded string (with `ISO-8859-1` encoding). See the [Values code for details](https://github.com/apache/kafka/blob/95f51539c8d0b88bd7f285011d42e2d1117107de/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L673-L682). ~Because of this, it seems much more sensible to also use base64 here for `ByteBuffer` so it matches the existing behavior with `byte[]`.~ ~I agree that it maybe doesn't suffice in all user situations, so if we also want to support other encodings we'd need other config changes that will require using the KIP mechanism to propose such enhancements.~ Actually, the existing code simply outputs the `toString()` result of a `byte[]` object, which is of the form `[B@22ef9844`, which is the `Object.toString()` implementation that includes the class alias (e.g., `[B` is a byte array) and the hex representation of the object's `hashCode()`. However, given that none of the cast forms of `byte[]` or `ByteBuffer[]` are useful, then perhaps it's worth adding the previous discussion on #4820 mentioning that: > As @ewencp suggested, for consistency perhaps we can use the same string formats as SimpleHeaderConverter: https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java, As mentioned above, the `Values.convertToString(...)` uses a base 64 encoding. And this aligns with @kkonstantine's suggestion: > Why are we selecting hex here and not base64 for example? Hex of course is less efficient. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12361) Change default connect producer request timeout
[ https://issues.apache.org/jira/browse/KAFKA-12361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-12361: - Assignee: Chris Egerton > Change default connect producer request timeout > --- > > Key: KAFKA-12361 > URL: https://issues.apache.org/jira/browse/KAFKA-12361 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Chris Egerton >Priority: Major > Labels: connect > > Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way > to ensure that records sent through the producer would even have the > opportunity to get delivered. Records could be timed out in the accumulator > if `request.timeout.ms` was reached before getting sent. Users worked around > this problem by setting `request.timeout.ms=Int.MaxValue`. The downside is > that this made the producer slower to discover "unclean" connection failures. > Now that we have KIP-91, there shouldn't be any reason to keep this > workaround. > One place it would be good to fix this is in connect's source tasks: > {code} > // These settings will execute infinite retries on retriable > exceptions. They *may* be overridden via configs passed to the worker, > // but this may compromise the delivery guarantees of Kafka Connect. > producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, > Integer.toString(Integer.MAX_VALUE)); > {code} > The comment about delivery guarantees is a little vague, but I think mainly > it is what was discussed above about ensuring at least once delivery. Note > that none of the default configs including both request timeout and delivery > timeout can avoid duplicates in all cases. For that idempotence is needed. It > is worth considering separately for connect whether that should be the > default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
rondagostino commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580575104 ## File path: tests/kafkatest/services/performance/end_to_end_latency.py ## @@ -74,12 +74,15 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, compression_ty def start_cmd(self, node): args = self.args.copy() args.update({ -'zk_connect': self.kafka.zk_connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'config_file': EndToEndLatencyService.CONFIG_FILE, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), 'java_class_name': self.java_class_name() }) +if node.version < V_0_9_0_0: Review comment: > name it something like consumer_supports_bootstrap_server... bunch of other bootstrap server functions Good point. I renamed it `consumer_supports_bootstrap_server()` as you suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
cmccabe commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580571155 ## File path: tests/kafkatest/tests/core/round_trip_fault_test.py ## @@ -47,32 +55,48 @@ def __init__(self, test_context): active_topics=active_topics) def setUp(self): -self.zk.start() +if self.zk: +self.zk.start() self.kafka.start() self.trogdor.start() def teardown(self): self.trogdor.stop() self.kafka.stop() -self.zk.stop() +if self.zk: +self.zk.stop() -def test_round_trip_workload(self): +def remote_quorum_nodes(self): +if quorum.for_test(self.test_context) == quorum.zk: +return self.zk.nodes +elif quorum.for_test(self.test_context) == quorum.remote_raft: +return self.kafka.controller_quorum.nodes +else: # co-located case, which we currently don't test but handle here for completeness in case we do test it +return [] Review comment: sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
cmccabe commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580570954 ## File path: tests/kafkatest/services/performance/end_to_end_latency.py ## @@ -74,12 +74,15 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, compression_ty def start_cmd(self, node): args = self.args.copy() args.update({ -'zk_connect': self.kafka.zk_connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'config_file': EndToEndLatencyService.CONFIG_FILE, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), 'java_class_name': self.java_class_name() }) +if node.version < V_0_9_0_0: Review comment: thanks. that sounds good! can you name it something like `consumer_supports_bootstrap_server`? There are a bunch of other bootstrap server functions (`acl_command_supports_bootstrap_server`, `topic_command_supports_bootstrap_server`, etc.) so it would be good to be clear This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch commented on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-783634110 Before this fix, the changes made in #4820 ([KAFKA-6684](https://issues.apache.org/jira/browse/KAFKA-6684)) resulted in `toString()` being called on `byte[]` and `ByteBuffer`. As highlighted in this PR and issue that `ByteBuffer.toString()` is not useful, but the `toString()` on `byte[]` still works. This PR seems to change that behavior, which would not be backward compatible. The discussion on PR #4820 also talked about making this compatible with `Values.convertToString(...)`, which for `byte[]` and `ByteBuffer` results in a base 64 encoded string (with `ISO-8859-1` encoding). See the [Values code for details](https://github.com/apache/kafka/blob/95f51539c8d0b88bd7f285011d42e2d1117107de/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L673-L682). Because of this, it seems much more sensible to also use base64 here for `ByteBuffer` so it matches the existing behavior with `byte[]`. I agree that it maybe doesn't suffice in all user situations, so if we also want to support other encodings we'd need other config changes that will require using the KIP mechanism to propose such enhancements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate
mjsax commented on a change in pull request #10170: URL: https://github.com/apache/kafka/pull/10170#discussion_r580546901 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1874,6 +1873,66 @@ public void close() {} assertEquals(2, punctuatedWallClockTime.size()); } +@Test +public void shouldPunctuateWithTimestampPreservedInProcessorContext() { +final org.apache.kafka.streams.kstream.TransformerSupplier> punctuateProcessor = +() -> new org.apache.kafka.streams.kstream.Transformer>() { +@Override +public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { +context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value")); +} + +@Override +public KeyValue transform(final Object key, final Object value) { +return null; +} + +@Override +public void close() {} +}; + +final List peekedContextTime = new ArrayList<>(); +final org.apache.kafka.streams.processor.ProcessorSupplier peekProcessor = +() -> new org.apache.kafka.streams.processor.AbstractProcessor() { +@Override +public void process(final Object key, final Object value) { +peekedContextTime.add(context.timestamp()); +} +}; + +internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) +.transform(punctuateProcessor) +.process(peekProcessor); +internalStreamsBuilder.buildAndOptimizeTopology(); + +final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + +thread.setState(StreamThread.State.STARTING); +thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); +final List assignedPartitions = new ArrayList<>(); + +final Map> activeTasks = new HashMap<>(); + +// assign single partition +assignedPartitions.add(t1p1); +activeTasks.put(task1, Collections.singleton(t1p1)); + +thread.taskManager().handleAssignment(activeTasks, emptyMap()); + +clientSupplier.consumer.assign(assignedPartitions); + clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); +thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); + +thread.runOnce(); +assertEquals(0, peekedContextTime.size()); + +mockTime.sleep(100L); +thread.runOnce(); + +assertEquals(1, peekedContextTime.size()); +assertNotNull(peekedContextTime.get(0)); Review comment: We should verify the actual timestamp. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate
mjsax commented on a change in pull request #10170: URL: https://github.com/apache/kafka/pull/10170#discussion_r580546373 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ## @@ -1874,6 +1873,66 @@ public void close() {} assertEquals(2, punctuatedWallClockTime.size()); } +@Test +public void shouldPunctuateWithTimestampPreservedInProcessorContext() { Review comment: I think we should add one more test for `STREAM_TIME` punctuation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate
mjsax commented on a change in pull request #10170: URL: https://github.com/apache/kafka/pull/10170#discussion_r580544814 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -792,7 +790,16 @@ public void punctuate(final ProcessorNode node, throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix)); } -updateProcessorContext(node, time.milliseconds(), null); +// when punctuating, we need to preserve the timestamp (this can be either system time or event time) +// while other record context are set as dummy: null topic, -1 partition, -1 offset and empty header +final ProcessorRecordContext recordContext = new ProcessorRecordContext( +timestamp, +-1L, +-1, +null, +new RecordHeaders() Review comment: Why not pass `null` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12323) Record timestamps not populated in event
[ https://issues.apache.org/jira/browse/KAFKA-12323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-12323: --- Assignee: Guozhang Wang > Record timestamps not populated in event > > > Key: KAFKA-12323 > URL: https://issues.apache.org/jira/browse/KAFKA-12323 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Adam Bellemare >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.8.0, 2.7.1 > > Attachments: PunctuateTimestampZeroTest.java > > > Upgraded a kafka streams application from 2.6.0 to 2.7.0. Noticed that the > events being produced had a "CreatedAt" timestamp = 0, causing downstream > failures as we depend on those timestamps. Reverting back to 2.6.0/2.6.1 > fixed this issue. This was the only change to the Kafka Streams application. > Consuming the event stream produced by 2.6.0 results in events that, when > consumed using the `kafka-avro-console-consumer` and `--property > print.timestamp=true` result in events prepended with the event times, such > as: > {code:java} > CreateTime:1613072202271 > CreateTime:1613072203412 > CreateTime:1613072205431 > {code} > etc. > However, when those events are produced by the Kafka Streams app using 2.7.0, > we get: > {code:java} > CreateTime:0 > CreateTime:0 > CreateTime:0 > {code} > I don't know if these is a default value somewhere that changed, but this is > actually a blocker for our use-cases as we now need to circumnavigate this > limitation (or roll back to 2.6.1, though there are other issues we must deal > with then). I am not sure which unit tests in the code base to look at to > validate this, but I wanted to log this bug now in case someone else has > already seen this or an open one exists (I didn't see one though). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12360) Improve documentation of max.task.idle.ms (kafka-streams)
[ https://issues.apache.org/jira/browse/KAFKA-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12360: Labels: beginner newbie trivial (was: kafka-streams) > Improve documentation of max.task.idle.ms (kafka-streams) > - > > Key: KAFKA-12360 > URL: https://issues.apache.org/jira/browse/KAFKA-12360 > Project: Kafka > Issue Type: Improvement > Components: docs, streams >Reporter: Domenico Delle Side >Priority: Minor > Labels: beginner, newbie, trivial > > _max.task.idle.ms_ is an handy way to pause processing in a *_kafka-streams_* > application. This is very useful when you need to join two topics that are > out of sync, i.e when data in a topic may be produced _before_ you receive > join information in the other topic. > In the documentation, however, it is not specified that the value of > _max.task.idle.ms_ *must* be lower than _max.poll.intervall.ms_, otherwise > you'll incur into an endless rebalancing problem. > I think it is better to clearly state this in the documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on pull request #10105: MINOR: System tests for Raft-based metadata quorums
rondagostino commented on pull request #10105: URL: https://github.com/apache/kafka/pull/10105#issuecomment-783624846 > I wonder if we should disallow tests without `[the @cluster]` annotation in the future. I think perhaps yes. Is there a need to allow it? A simple oversight generates a significant parallelism hit at this point -- and they compound quickly as evidenced here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
rondagostino commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580532054 ## File path: tests/kafkatest/tests/core/round_trip_fault_test.py ## @@ -47,32 +55,48 @@ def __init__(self, test_context): active_topics=active_topics) def setUp(self): -self.zk.start() +if self.zk: +self.zk.start() self.kafka.start() self.trogdor.start() def teardown(self): self.trogdor.stop() self.kafka.stop() -self.zk.stop() +if self.zk: +self.zk.stop() -def test_round_trip_workload(self): +def remote_quorum_nodes(self): +if quorum.for_test(self.test_context) == quorum.zk: +return self.zk.nodes +elif quorum.for_test(self.test_context) == quorum.remote_raft: +return self.kafka.controller_quorum.nodes +else: # co-located case, which we currently don't test but handle here for completeness in case we do test it +return [] Review comment: > throw an exception or does the current code actually work for this case? The code always needs `` + ``, where the latter are the ZooKeeper or remote Controller Quorum nodes. If we were to run this test with co-located Raft Quorum controllers then those nodes would be accounted for because they are part of the Kafka nodes, so there is no need to explicitly add them. So this code is correct in that it return an empty list for that case. As was indicated in the comment, it's here just in case we ever decide we want to test it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
rondagostino commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580532054 ## File path: tests/kafkatest/tests/core/round_trip_fault_test.py ## @@ -47,32 +55,48 @@ def __init__(self, test_context): active_topics=active_topics) def setUp(self): -self.zk.start() +if self.zk: +self.zk.start() self.kafka.start() self.trogdor.start() def teardown(self): self.trogdor.stop() self.kafka.stop() -self.zk.stop() +if self.zk: +self.zk.stop() -def test_round_trip_workload(self): +def remote_quorum_nodes(self): +if quorum.for_test(self.test_context) == quorum.zk: +return self.zk.nodes +elif quorum.for_test(self.test_context) == quorum.remote_raft: +return self.kafka.controller_quorum.nodes +else: # co-located case, which we currently don't test but handle here for completeness in case we do test it +return [] Review comment: > throw an exception or does the current code actually work for this case? The code always needs + , where the latter are the ZooKeeper or remote Controller Quorum nodes. If we were to run this test with co-located Raft Quorum controllers then those nodes would be accounted for because they are part of the Kafka nodes, so there is no need to explicitly add them. So this code is correct in that it return an empty list for that case. As was indicated in the comment, it's here just in case we ever decide we want to test it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
rondagostino commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580527682 ## File path: tests/kafkatest/services/performance/end_to_end_latency.py ## @@ -74,12 +74,15 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, compression_ty def start_cmd(self, node): args = self.args.copy() args.update({ -'zk_connect': self.kafka.zk_connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'config_file': EndToEndLatencyService.CONFIG_FILE, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), 'java_class_name': self.java_class_name() }) +if node.version < V_0_9_0_0: Review comment: > would be good to have a function in version.py to reflect [functionality unsupported below 0.9.0] I'll add this function: ``` def supports_bootstrap_server(self): return self >= V_0_9_0_0 ``` I then made the changes all over since this 0.9.0 constant is checked in several places: ``` tests/kafkatest/services/console_consumer.py tests/kafkatest/services/performance/consumer_performance.py tests/kafkatest/services/performance/end_to_end_latency.py tests/kafkatest/services/performance/producer_performance.py tests/kafkatest/tests/core/upgrade_test.py tests/kafkatest/tests/core/upgrade_test.py ``` We can revert if you think I went too far. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12361) Change default connect producer request timeout
[ https://issues.apache.org/jira/browse/KAFKA-12361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12361: Description: Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way to ensure that records sent through the producer would even have the opportunity to get delivered. Records could be timed out in the accumulator if `request.timeout.ms` was reached before getting sent. Users worked around this problem by setting `request.timeout.ms=Int.MaxValue`. The downside is that this made the producer slower to discover "unclean" connection failures. Now that we have KIP-91, there shouldn't be any reason to keep this workaround. One place it would be good to fix this is in connect's source tasks: {code} // These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker, // but this may compromise the delivery guarantees of Kafka Connect. producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); {code} The comment about delivery guarantees is a little vague, but I think mainly it is what was discussed above about ensuring at least once delivery. Note that none of the default configs including both request timeout and delivery timeout can avoid duplicates in all cases. For that idempotence is needed. It is worth considering separately for connect whether that should be the default. was: Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way to ensure that records sent through the producer would even have the opportunity to get delivered. Records could be timed out in the accumulator if `request.timeout.ms` was reached before getting sent. Users worked around this problem by setting `request.timeout.ms=Int.MaxValue`. The downside is that this made the producer slower to discover "unclean" connection failures. Now that we have KIP-91, there shouldn't be any reason to keep this workaround. One place it would be good to fix this is in connect's source tasks: {code} // These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker, // but this may compromise the delivery guarantees of Kafka Connect. producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); {code} The comment about delivery guarantees is a little vague, but I think mainly it is what was discussed above about ensuring at least once delivery. Note that none of the default configs including request timeout or delivery timeout can avoid duplicates in all cases. For that idempotence is needed. It is worth considering separately for connect whether that should be the default. > Change default connect producer request timeout > --- > > Key: KAFKA-12361 > URL: https://issues.apache.org/jira/browse/KAFKA-12361 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Labels: connect > > Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way > to ensure that records sent through the producer would even have the > opportunity to get delivered. Records could be timed out in the accumulator > if `request.timeout.ms` was reached before getting sent. Users worked around > this problem by setting `request.timeout.ms=Int.MaxValue`. The downside is > that this made the producer slower to discover "unclean" connection failures. > Now that we have KIP-91, there shouldn't be any reason to keep this > workaround. > One place it would be good to fix this is in connect's source tasks: > {code} > // These settings will execute infinite retries on retriable > exceptions. They *may* be overridden via configs passed to the worker, > // but this may compromise the delivery guarantees of Kafka Connect. > producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, > Integer.toString(Integer.MAX_VALUE)); > {code} > The comment about delivery guarantees is a little vague, but I think mainly > it is what was discussed above about ensuring at least once delivery. Note > that none of the default configs including both request timeout and delivery > timeout can avoid duplicates in all cases. For that idempotence is needed. It > is worth considering separately for connect whether that should be the > default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12361) Change default connect producer request timeout
Jason Gustafson created KAFKA-12361: --- Summary: Change default connect producer request timeout Key: KAFKA-12361 URL: https://issues.apache.org/jira/browse/KAFKA-12361 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way to ensure that records sent through the producer would even have the opportunity to get delivered. Records could be timed out in the accumulator if `request.timeout.ms` was reached before getting sent. Users worked around this problem by setting `request.timeout.ms=Int.MaxValue`. The downside is that this made the producer slower to discover "unclean" connection failures. Now that we have KIP-91, there shouldn't be any reason to keep this workaround. One place it would be good to fix this is in connect's source tasks: {code} // These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker, // but this may compromise the delivery guarantees of Kafka Connect. producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); {code} The comment about delivery guarantees is a little vague, but I think mainly it is what was discussed above about ensuring at least once delivery. Note that none of the default configs including request timeout or delivery timeout can avoid duplicates in all cases. For that idempotence is needed. It is worth considering separately for connect whether that should be the default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10105: MINOR: System tests for Raft-based metadata quorums
cmccabe commented on pull request #10105: URL: https://github.com/apache/kafka/pull/10105#issuecomment-783579134 Good work spotting the missing `@cluster` annotations! That is quite a performance win. I wonder if we should disallow tests without this annotation in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
cmccabe commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580489004 ## File path: tests/kafkatest/tests/core/round_trip_fault_test.py ## @@ -47,32 +55,48 @@ def __init__(self, test_context): active_topics=active_topics) def setUp(self): -self.zk.start() +if self.zk: +self.zk.start() self.kafka.start() self.trogdor.start() def teardown(self): self.trogdor.stop() self.kafka.stop() -self.zk.stop() +if self.zk: +self.zk.stop() -def test_round_trip_workload(self): +def remote_quorum_nodes(self): +if quorum.for_test(self.test_context) == quorum.zk: +return self.zk.nodes +elif quorum.for_test(self.test_context) == quorum.remote_raft: +return self.kafka.controller_quorum.nodes +else: # co-located case, which we currently don't test but handle here for completeness in case we do test it +return [] Review comment: should this throw an exception? Or does the current code actually work for this case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
cmccabe commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580486218 ## File path: tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py ## @@ -46,13 +47,16 @@ def __init__(self, test_context): self.num_consumers = 1 def setUp(self): -self.zk.start() +if self.zk: +self.zk.start() def min_cluster_size(self): # Override this since we're adding services outside of the constructor return super(ClientCompatibilityProduceConsumeTest, self).min_cluster_size() + self.num_producers + self.num_consumers -@parametrize(broker_version=str(DEV_BRANCH)) +@cluster(num_nodes=9) +@matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) +@cluster(num_nodes=9) Review comment: `@cluster` is repeated again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums
cmccabe commented on a change in pull request #10105: URL: https://github.com/apache/kafka/pull/10105#discussion_r580485952 ## File path: tests/kafkatest/tests/client/client_compatibility_features_test.py ## @@ -120,10 +123,9 @@ def invoke_compatibility_program(self, features): @parametrize(broker_version=str(LATEST_2_3)) @parametrize(broker_version=str(LATEST_2_4)) @parametrize(broker_version=str(LATEST_2_5)) -@parametrize(broker_version=str(LATEST_2_6)) -@parametrize(broker_version=str(LATEST_2_7)) Review comment: hmm. we don't want to drop 2.6 and 2.7 here, do we? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org