[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-22 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r580825461



##
File path: config/log4j.properties
##
@@ -61,11 +61,11 @@ 
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 log4j.logger.org.apache.zookeeper=INFO
 
 # Change the two lines below to adjust the general broker logging level 
(output to server.log and stdout)
-log4j.logger.kafka=INFO
-log4j.logger.org.apache.kafka=INFO
+log4j.logger.kafka=TRACE
+log4j.logger.org.apache.kafka=TRACE
 
 # Change to DEBUG or TRACE to enable request logging
-log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.logger.kafka.request.logger=TRACE, requestAppender

Review comment:
   yes, these are accidental.  will revert.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy opened a new pull request #10188: MINOR: Update HttpClient to "4.5.13"

2021-02-22 Thread GitBox


omkreddy opened a new pull request #10188:
URL: https://github.com/apache/kafka/pull/10188


   Update HttpClient to recent bug fix version 4.5.13.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10172: MINOR: add toString to Subscription classes

2021-02-22 Thread GitBox


showuon commented on pull request #10172:
URL: https://github.com/apache/kafka/pull/10172#issuecomment-783954552


   @chia7712 , thanks for the comments. Please take a look again. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10172: MINOR: add toString to Subscription classes

2021-02-22 Thread GitBox


showuon commented on a change in pull request #10172:
URL: https://github.com/apache/kafka/pull/10172#discussion_r580802728



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
##
@@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional 
groupInstanceId) {
 public Optional groupInstanceId() {
 return groupInstanceId;
 }
+
+@Override
+public String toString() {
+return "Subscription(" +
+"topics=" + topics +
+", userData=" + userData +

Review comment:
   Oh, I didn't notice it! I logged the remaining size of the userData like 
the `Assignment.toString()` did. Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10172: MINOR: add toString to Subscription classes

2021-02-22 Thread GitBox


showuon commented on a change in pull request #10172:
URL: https://github.com/apache/kafka/pull/10172#discussion_r580802108



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
##
@@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional 
groupInstanceId) {
 public Optional groupInstanceId() {
 return groupInstanceId;
 }
+
+@Override
+public String toString() {
+return "Subscription(" +
+"topics=" + topics +
+", userData=" + userData +
+", ownedPartitions=" + ownedPartitions +
+(groupInstanceId.isPresent() ? ", groupInstanceId=" + 
groupInstanceId.get() : "") +

Review comment:
   Updated. Thank you!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate

2021-02-22 Thread GitBox


guozhangwang commented on a change in pull request #10170:
URL: https://github.com/apache/kafka/pull/10170#discussion_r580793422



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -792,7 +790,16 @@ public void punctuate(final ProcessorNode node,
 throw new IllegalStateException(String.format("%sCurrent node is 
not null", logPrefix));
 }
 
-updateProcessorContext(node, time.milliseconds(), null);
+// when punctuating, we need to preserve the timestamp (this can be 
either system time or event time)
+// while other record context are set as dummy: null topic, -1 
partition, -1 offset and empty header
+final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+timestamp,
+-1L,
+-1,
+null,
+new RecordHeaders()

Review comment:
   I'm following the existing behavior here: if the record context is 
`null`, we also return a `return new RecordHeaders();`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10167: KAFKA-12261: Mention about potential delivery loss on partition split when auto.offset.reset = latest

2021-02-22 Thread GitBox


showuon commented on pull request #10167:
URL: https://github.com/apache/kafka/pull/10167#issuecomment-783951911


   > What do you mean by "splitting partitions"?
   
   @ijuma , @ocadaruma is saying that the partition number is 
increased/decreased. You are right, the phrase **splitting partitions** is not 
clear. Might be better to change to:
   `Note that altering partition numbers while setting this config to latest 
may cause...`
   
   What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10167: KAFKA-12261: Mention about potential delivery loss on partition split when auto.offset.reset = latest

2021-02-22 Thread GitBox


ijuma commented on pull request #10167:
URL: https://github.com/apache/kafka/pull/10167#issuecomment-783944686


   What do you mean by "splitting partitions"?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10169: MINOR: Update Scala to 2.13.5

2021-02-22 Thread GitBox


ijuma commented on pull request #10169:
URL: https://github.com/apache/kafka/pull/10169#issuecomment-783943805


   Merged to trunk and 2.8 branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] modax commented on pull request #9904: KAFKA-12211: don't change perm for base/state dir when no persistent store

2021-02-22 Thread GitBox


modax commented on pull request #9904:
URL: https://github.com/apache/kafka/pull/9904#issuecomment-783943018


   Hi,
   
   Is there any reason this is not in 2.6? 2.6.1 is affected by this problem.
   
   Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10167: KAFKA-12261: Mention about potential delivery loss on partition split when auto.offset.reset = latest

2021-02-22 Thread GitBox


showuon commented on pull request #10167:
URL: https://github.com/apache/kafka/pull/10167#issuecomment-783941027


   @ijuma , could you take a look at this PR? Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-22 Thread GitBox


chia7712 commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r580788136



##
File path: 
clients/src/main/resources/common/message/DescribeProducersResponse.json
##
@@ -35,7 +35,7 @@
 { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
   "about": "The partition error message, which may be null if no 
additional details are available" },
 { "name": "ActiveProducers", "type": "[]ProducerState", "versions": 
"0+", "fields": [
-  { "name": "ProducerId", "type": "int64", "versions": "0+" },
+  { "name": "ProducerId", "type": "int64", "versions": "0+", 
"entityType": "producerId" },
   { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },

Review comment:
   The type confuses me. `DescribeProducersResponse` and this protocol use 
`int64` but other protocols choose `int32`. Also, the type of 
`TransactionMetadata#producerEpoch` is `short` rather than `integer`. Which one 
is correct?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10169: MINOR: Update Scala to 2.13.5

2021-02-22 Thread GitBox


ijuma merged pull request #10169:
URL: https://github.com/apache/kafka/pull/10169


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

2021-02-22 Thread GitBox


guozhangwang commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580786789



##
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##
@@ -243,6 +244,11 @@
  */
 Map endOffsets(Collection 
partitions, Duration timeout);
 
+/**
+ * @see KafkaConsumer#currentLag(TopicPartition)
+ */
+OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
   If we concern that users may call this function too frequent looping a 
large number of partitions, and each call is synchronizing on the subscription 
state, then maybe we can make it in a batching mode.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10169: MINOR: Update Scala to 2.13.5

2021-02-22 Thread GitBox


ijuma commented on pull request #10169:
URL: https://github.com/apache/kafka/pull/10169#issuecomment-783932706


   Unrelated flaky failures:
   
   > Build / JDK 15 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   > Build / JDK 11 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining
   > 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-22 Thread GitBox


chia7712 commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r580779375



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsRequest.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class DescribeTransactionsRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+public final DescribeTransactionsRequestData data;
+
+public Builder(DescribeTransactionsRequestData data) {
+super(ApiKeys.DESCRIBE_TRANSACTIONS);
+this.data = data;
+}
+
+@Override
+public DescribeTransactionsRequest build(short version) {
+return new DescribeTransactionsRequest(data, version);
+}
+
+@Override
+public String toString() {
+return data.toString();
+}
+}
+
+private final DescribeTransactionsRequestData data;
+
+private DescribeTransactionsRequest(DescribeTransactionsRequestData data, 
short version) {
+super(ApiKeys.DESCRIBE_TRANSACTIONS, version);
+this.data = data;
+}
+
+@Override
+public DescribeTransactionsRequestData data() {
+return data;
+}
+
+@Override
+public DescribeTransactionsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+Errors error = Errors.forException(e);
+DescribeTransactionsResponseData response = new 
DescribeTransactionsResponseData();

Review comment:
   the `throttleTimeMs` is not added to response

##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##
@@ -255,6 +258,45 @@ class TransactionCoordinator(brokerId: Int,
 }
   }
 
+  def handleDescribeTransactions(
+transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+val transactionState = new 
DescribeTransactionsResponseData.TransactionState()
+  .setTransactionalId(transactionalId)
+
+if (!isActive.get()) {
+  transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+} else if (transactionalId == null || transactionalId.isEmpty) {
+  transactionState.setErrorCode(Errors.INVALID_REQUEST.code)
+} else {
+  txnManager.getTransactionState(transactionalId) match {
+case Left(error) =>
+  transactionState.setErrorCode(error.code)
+case Right(None) =>
+  transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code)
+case Right(Some(coordinatorEpochAndMetadata)) =>
+  val txnMetadata = coordinatorEpochAndMetadata.transactionMetadata
+  txnMetadata.inLock {
+val partitionsByTopic = 
CollectionUtils.groupPartitionsByTopic(txnMetadata.topicPartitions.asJava)
+partitionsByTopic.forEach { (topic, partitions) =>
+  val topicData = new DescribeTransactionsResponseData.TopicData()
+.setTopic(topic)
+.setPartitions(partitions)
+  transactionState.topics.add(topicData)
+}
+
+transactionState
+  .setErrorCode(Errors.NONE.code)
+  .setProducerId(txnMetadata.producerId)
+  .setProducerEpoch(txnMetadata.producerEpoch)
+  .setTransactionState(txnMetadata.state.toString)

Review comment:
   As it is a part of serialized data, should we add constant string to 
those enums instead of calling `toString`?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3271,6 +3272,34 @@ class KafkaApis(val requestChannel: RequestChannel,
   "Apache ZooKeeper mode.")
   }
 
+  def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit 
= {
+val 

[GitHub] [kafka] guozhangwang commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

2021-02-22 Thread GitBox


guozhangwang commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580782275



##
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##
@@ -243,6 +244,11 @@
  */
 Map endOffsets(Collection 
partitions, Duration timeout);
 
+/**
+ * @see KafkaConsumer#currentLag(TopicPartition)
+ */
+OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
   For API calls that may incur a broker round trip, have batching of 
partitions makes sense. For this API I think single partition lookup is good 
enough.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -156,24 +134,24 @@ public boolean readyToProcess(final long wallClockTime) {
 final TopicPartition partition = entry.getKey();
 final RecordQueue queue = entry.getValue();
 
-final Long nullableFetchedLag = fetchedLags.get(partition);
+final OptionalLong fetchedLag = lagProvider.apply(partition);

Review comment:
   Wearing my paranoid hat here: `readyToProcess` is on the critical path, 
called per record, while we would only update the underlying lag at most as 
frequent as the consumer poll rate. And in practice we would fall in to the 
first condition `!queue.isEmpty()` most of the time. On the other hand, 
`partitionLag` call on `SubscriptionState` is synchronized and could slow down 
the fetching thread (well, maybe just a bit). So could we call the provider 
only necessary, i.e. the queue is empty and the lag is either == 0 or not 
present?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-22 Thread GitBox


ijuma commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r580781460



##
File path: config/log4j.properties
##
@@ -61,11 +61,11 @@ 
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 log4j.logger.org.apache.zookeeper=INFO
 
 # Change the two lines below to adjust the general broker logging level 
(output to server.log and stdout)
-log4j.logger.kafka=INFO
-log4j.logger.org.apache.kafka=INFO
+log4j.logger.kafka=TRACE
+log4j.logger.org.apache.kafka=TRACE
 
 # Change to DEBUG or TRACE to enable request logging
-log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.logger.kafka.request.logger=TRACE, requestAppender

Review comment:
   Are these accidental?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10179: MINOR: tune KIP-631 configurations

2021-02-22 Thread GitBox


ijuma commented on pull request #10179:
URL: https://github.com/apache/kafka/pull/10179#issuecomment-783923893


   @hachikuji can you please review these timeouts?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10179: MINOR: tune KIP-631 configurations

2021-02-22 Thread GitBox


cmccabe commented on pull request #10179:
URL: https://github.com/apache/kafka/pull/10179#issuecomment-783922535


   I added some JavaDoc to the RaftConfig class



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default

2021-02-22 Thread GitBox


ijuma commented on pull request #10174:
URL: https://github.com/apache/kafka/pull/10174#issuecomment-783892451


   Merged to trunk and 2.8 branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default

2021-02-22 Thread GitBox


ijuma merged pull request #10174:
URL: https://github.com/apache/kafka/pull/10174


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default

2021-02-22 Thread GitBox


chia7712 commented on pull request #10174:
URL: https://github.com/apache/kafka/pull/10174#issuecomment-783879649


   > It seems better to aim for ease of use until there's a better way of 
handling this
   
   make sense. be a friendly libraries :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default

2021-02-22 Thread GitBox


ijuma edited a comment on pull request #10174:
URL: https://github.com/apache/kafka/pull/10174#issuecomment-783878266


   @chia7712 For better or worse, libraries like 
https://github.com/embeddedkafka/embedded-kafka are often used. And there isn't 
a great alternative. Getting an exception when there's a mismatch in the patch 
version of scala-library is pretty surprising. It seems better to aim for ease 
of use until there's a better way of handling this. Do you agree?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10174: KAFKA-12357: Do not inline methods from the scala package by default

2021-02-22 Thread GitBox


ijuma commented on pull request #10174:
URL: https://github.com/apache/kafka/pull/10174#issuecomment-783878266


   @chia7712 For better or worse, libraries like 
https://github.com/embeddedkafka/embedded-kafka are often used. And there isn't 
a great alternative. Getting an exception when there's a mismatch in the patch 
version of scala-library is pretty surprising. It seems better to aim for ease 
of use until there's a better way of handling this. Would you agree?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10172: MINOR: add toString to Subscription classes

2021-02-22 Thread GitBox


chia7712 commented on a change in pull request #10172:
URL: https://github.com/apache/kafka/pull/10172#discussion_r580767910



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
##
@@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional 
groupInstanceId) {
 public Optional groupInstanceId() {
 return groupInstanceId;
 }
+
+@Override
+public String toString() {
+return "Subscription(" +
+"topics=" + topics +
+", userData=" + userData +
+", ownedPartitions=" + ownedPartitions +
+(groupInstanceId.isPresent() ? ", groupInstanceId=" + 
groupInstanceId.get() : "") +

Review comment:
   How about using lambda? `(groupInstanceId.map(s -> ", groupInstanceId=" 
+ s).orElse(""))`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
##
@@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional 
groupInstanceId) {
 public Optional groupInstanceId() {
 return groupInstanceId;
 }
+
+@Override
+public String toString() {
+return "Subscription(" +
+"topics=" + topics +
+", userData=" + userData +

Review comment:
   The type of `userData` is `ByteBuffer` so it makes no sense to print it 
by `toString`. Maybe we can print the size?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
##
@@ -132,6 +132,16 @@ public void setGroupInstanceId(Optional 
groupInstanceId) {
 public Optional groupInstanceId() {
 return groupInstanceId;
 }
+
+@Override
+public String toString() {
+return "Subscription(" +
+"topics=" + topics +
+", userData=" + userData +
+", ownedPartitions=" + ownedPartitions +
+(groupInstanceId.isPresent() ? ", groupInstanceId=" + 
groupInstanceId.get() : "") +

Review comment:
   For another, it seems to me "null" is more suitable than empty string.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10166: MINOR: update the memberMetadata output

2021-02-22 Thread GitBox


chia7712 merged pull request #10166:
URL: https://github.com/apache/kafka/pull/10166


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9912: MINOR: Move `RequestChannel.Response` creation logic into `RequestChannel`

2021-02-22 Thread GitBox


hachikuji merged pull request #9912:
URL: https://github.com/apache/kafka/pull/9912


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-22 Thread GitBox


hachikuji commented on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-783868752


   @dengziming Good call. I've added a few test cases. I think we were missing 
authorization of the topics included in the response. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-22 Thread GitBox


dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r580761072



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -92,4 +93,13 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 
 return Optional.of(new SnapshotPath(path, new 
OffsetAndEpoch(endOffset, epoch), partial));
 }
+
+/**
+ * Delete this snapshot from the filesystem.
+ */
+public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch 
snapshotId) throws IOException {
+Path path = snapshotPath(logDir, snapshotId);
+return Files.deleteIfExists(path);

Review comment:
   This is a good catch, I changed the procedure to rename and async 
delete. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10146: HOTFIX: fix Scala 2.12 build error caused by ControllerApisTest.scala

2021-02-22 Thread GitBox


chia7712 commented on pull request #10146:
URL: https://github.com/apache/kafka/pull/10146#issuecomment-783863300


   The 2.8 build with scala 2.12 is broken also so I back port this PR to 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10187: MINOR: document restriction against running multiple Streams apps on same state.dir

2021-02-22 Thread GitBox


ableegoldman commented on pull request #10187:
URL: https://github.com/apache/kafka/pull/10187#issuecomment-783862480


   Call for review any of @cadonna @lct45 @wcarlson5 @rodesai @agavra



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10187: MINOR: document restriction against running multiple Streams apps on same state.dir

2021-02-22 Thread GitBox


ableegoldman commented on pull request #10187:
URL: https://github.com/apache/kafka/pull/10187#issuecomment-783862209


   This should be cherrypicked back to the 2.8 branch (it's just docs) -- cc/ 
2.8 release manager @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10187: MINOR: document restriction against running multiple Streams apps on same state.dir

2021-02-22 Thread GitBox


ableegoldman opened a new pull request #10187:
URL: https://github.com/apache/kafka/pull/10187


   Running more than one Streams instance on the same physical state directory 
has never been supported, but until now it's also not really been enforced. 
Since we fixed Streams to fail fast in the case of this misconfiguration 
instead of offering vague symptoms, we will throw an exception on startup if it 
is detected. We should document this clearly as some users may have been using 
this setup for testing (note: there's no reason to do this in a real production 
app, as you should either scale up by adding more threads and/or scale out by 
adding new instances with their own storage)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

2021-02-22 Thread GitBox


chia7712 commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580741128



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2219,6 +2221,25 @@ public void resume(Collection 
partitions) {
 }
 }
 
+/**
+ * Get the consumer's current lag on the partition. Returns an "empty" 
{@link OptionalLong} if the lag is not known,
+ * for example if there is no position yet, or if the end offset is not 
known yet.
+ *
+ * 
+ * This method uses locally cached metadata and never makes a remote call.
+ *
+ * @param topicPartition The partition to get the lag for.
+ *
+ * @return This {@code Consumer} instance's current lag for the given 
partition.
+ *
+ * @throws IllegalStateException if the {@code topicPartition} is not 
assigned
+ **/
+@Override
+public OptionalLong currentLag(TopicPartition topicPartition) {
+final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);

Review comment:
   Other methods call `acquireAndEnsureOpen();` first and then call 
`release()` in the finally block. Should this new method follow same pattern?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax closed pull request #9690: KAFKA-10017: fix flaky EOS-beta upgrade test

2021-02-22 Thread GitBox


mjsax closed pull request #9690:
URL: https://github.com/apache/kafka/pull/9690


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9690: KAFKA-10017: fix flaky EOS-beta upgrade test

2021-02-22 Thread GitBox


mjsax commented on pull request #9690:
URL: https://github.com/apache/kafka/pull/9690#issuecomment-783848142


   As we have a `2.8` release branch now, it seems not to be worth any longer 
to backport to fix to `2.6` branch. Closing this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10166: MINOR: update the memberMetadata output

2021-02-22 Thread GitBox


showuon commented on pull request #10166:
URL: https://github.com/apache/kafka/pull/10166#issuecomment-783847072


   @chia7712 , could you have a look at this one line change? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12273) InterBrokerSendThread#pollOnce throws FatalExitError even though it is shutdown correctly

2021-02-22 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12273.

Resolution: Fixed

> InterBrokerSendThread#pollOnce throws FatalExitError even though it is 
> shutdown correctly
> -
>
> Key: KAFKA-12273
> URL: https://issues.apache.org/jira/browse/KAFKA-12273
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.0.0, 2.8.0
>
>
> kafka tests sometimes shutdown gradle with non-zero code. The (one of) root 
> cause is that InterBrokerSendThread#pollOnce encounters DisconnectException 
> when NetworkClient is closing. DisconnectException should be viewed as 
> "expected" error as we do close it. In other words, 
> InterBrokerSendThread#pollOnce should swallow it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10024: KAFKA-12273 InterBrokerSendThread#pollOnce throws FatalExitError even…

2021-02-22 Thread GitBox


chia7712 commented on pull request #10024:
URL: https://github.com/apache/kafka/pull/10024#issuecomment-783842651


   push to trunk and 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10024: KAFKA-12273 InterBrokerSendThread#pollOnce throws FatalExitError even…

2021-02-22 Thread GitBox


chia7712 merged pull request #10024:
URL: https://github.com/apache/kafka/pull/10024


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #10186: MINOR: bump release version to 3.0.0-SNAPSHOT

2021-02-22 Thread GitBox


mjsax opened a new pull request #10186:
URL: https://github.com/apache/kafka/pull/10186


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]

2021-02-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-10665.
---
Resolution: Fixed

> Flaky Test 
> StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization
>  = all]
> 
>
> Key: KAFKA-10665
> URL: https://issues.apache.org/jira/browse/KAFKA-10665
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>  Labels: flaky-test
>
> {code:java}
> java.nio.file.DirectoryNotEmptyException: 
> /tmp/kafka-13241964730537515637/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_
>   at 
> java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:246)
>   at 
> java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105)
>   at java.base/java.nio.file.Files.delete(Files.java:1146)
>   at 
> org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:869)
>   at 
> org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:839)
>   at java.base/java.nio.file.Files.walkFileTree(Files.java:2822)
>   at java.base/java.nio.file.Files.walkFileTree(Files.java:2876)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:839)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:825)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151)
>   at 
> org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122)
> {code}
> https://github.com/apache/kafka/pull/9515/checks?check_run_id=1333753280



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic

2021-02-22 Thread GitBox


showuon commented on pull request #10185:
URL: https://github.com/apache/kafka/pull/10185#issuecomment-783839489


   @mimaison , could you help review this PR? Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic

2021-02-22 Thread GitBox


showuon opened a new pull request #10185:
URL: https://github.com/apache/kafka/pull/10185


   The reason why the test sometimes failed with: `TopicExistsException: Topic 
'primary.test-topic-2' already exists.` is because we tried to create the topic 
that the MM2 already help us created. That is, 
   ```java
   primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
   // after the primary cluster created "test-topic-2" topic, the backup 
cluster will auto-created the topic: "primary.test-topic-2" for us
   backup.kafka().createTopic("primary.test-topic-2", 1); // this line will 
have race condition with the MM2
   ```
   
   Fix the issue by explicitly waiting for the MM2 auto-created topic. Also fix 
2 issues in the tests:
   1. We should handle delete topics exception well
   2. We had resource leak due to no close the adminClient
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10180: KAFKA- 12347: expose offsets to streams client (WIP)

2021-02-22 Thread GitBox


mjsax commented on a change in pull request #10180:
URL: https://github.com/apache/kafka/pull/10180#discussion_r580731514



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -91,6 +91,7 @@
 // includes assigned & initialized tasks and unassigned tasks we locked 
temporarily during rebalance
 private final Set lockedTaskDirectories = new HashSet<>();
 private java.util.function.Consumer> resetter;
+private Map committedOffsets = new HashMap<>();

Review comment:
   During a rebalance, we should delete all entries for partitions we don't 
own any longer.
   
   Should we also pre-populate this map when we init a task (cf 
`StreamsTask#initializeMetadata()`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10180: KAFKA- 12347: expose offsets to streams client (WIP)

2021-02-22 Thread GitBox


mjsax commented on a change in pull request #10180:
URL: https://github.com/apache/kafka/pull/10180#discussion_r580731029



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -307,6 +308,9 @@ public boolean isRunning() {
 private final ProcessingMode processingMode;
 private AtomicBoolean leaveGroupRequested;
 
+private final Map committedOffsets;

Review comment:
   Seems to be unused?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-22 Thread GitBox


dengziming commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r580710886



##
File path: 
clients/src/main/resources/common/message/DescribeTransactionsResponse.json
##
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 65,
+  "type": "response",
+  "name": "DescribeTransactionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+  { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+"about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+  { "name": "TransactionStates", "type": "[]TransactionState", "versions": 
"0+", "fields": [
+{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
+{ "name": "TransactionalId", "type": "string", "versions": "0+" },

Review comment:
   we can add `"entityType": "transactionalId"` to this field.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users

2021-02-22 Thread GitBox


showuon commented on pull request #9627:
URL: https://github.com/apache/kafka/pull/9627#issuecomment-783805863


   @abbccdda @guozhangwang @mjsax  , please help review this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-22 Thread GitBox


cmccabe opened a new pull request #10184:
URL: https://github.com/apache/kafka/pull/10184


   Enable the new KIP-500 controller to delete topics.
   
   Fix a bug where feature level records were not correctly replayed.
   
   Fix a bug in TimelineHashMap#remove where the wrong type was being
   returned.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-22 Thread GitBox


hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r580705481



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -92,4 +93,13 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 
 return Optional.of(new SnapshotPath(path, new 
OffsetAndEpoch(endOffset, epoch), partial));
 }
+
+/**
+ * Delete this snapshot from the filesystem.
+ */
+public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch 
snapshotId) throws IOException {
+Path path = snapshotPath(logDir, snapshotId);
+return Files.deleteIfExists(path);

Review comment:
   When we remove other log files in `Log`, we first rename them to 
`{file}.deleted`. Then we schedule them for background deletion. Do you think 
we should do similarly for snapshot files?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12160) KafkaStreams configs are documented incorrectly

2021-02-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-12160.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

> KafkaStreams configs are documented incorrectly
> ---
>
> Key: KAFKA-12160
> URL: https://issues.apache.org/jira/browse/KAFKA-12160
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Matthias J. Sax
>Assignee: James Galasyn
>Priority: Minor
> Fix For: 2.8.0
>
>
> In version 2.3, we removed the KafkaStreams default of `max.poll.interval.ms` 
> and fall-back to the consumer default. However, the docs still contain 
> `Integer.MAX_VALUE` as default.
> Because we rely on the consumer default, we should actually remove 
> `max.poll.interval.ms` from the Kafka Streams docs completely. We might want 
> to fix this is some older versions, too. Not sure how far back we want to go.
> Furhtermore, in 2.7 docs, the section of "Default Values" and "Parameters 
> controlled by Kafka Streams" contain incorrect information.
> cf 
> https://kafka.apache.org/27/documentation/streams/developer-guide/config-streams.html#default-values
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #10182: KAFKA-12160: Remove max.poll.interval.ms from config docs (and KSTREAMS-4881)

2021-02-22 Thread GitBox


mjsax commented on pull request #10182:
URL: https://github.com/apache/kafka/pull/10182#issuecomment-783792561


   Merged to `trunk` and cherry-picked to `2.8` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10182: KAFKA-12160: Remove max.poll.interval.ms from config docs (and KSTREAMS-4881)

2021-02-22 Thread GitBox


mjsax merged pull request #10182:
URL: https://github.com/apache/kafka/pull/10182


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-22 Thread GitBox


hachikuji opened a new pull request #10183:
URL: https://github.com/apache/kafka/pull/10183


   This patch implements the `DescribeTransactions` API as documented in 
KIP-664: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn opened a new pull request #10182: KAFKA-12160: Remove max.poll.interval.ms from config docs

2021-02-22 Thread GitBox


JimGalasyn opened a new pull request #10182:
URL: https://github.com/apache/kafka/pull/10182


   [KAFKA-12160](https://issues.apache.org/jira/browse/KAFKA-12160): Remove 
`max.poll.interval.ms` from the Kafka Streams docs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`

2021-02-22 Thread GitBox


hachikuji commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r580635693



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
  offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
  responseCallback: 
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
 group.inLock {
-  if (group.is(Dead)) {
-// if the group is marked as dead, it means some other thread has just 
removed the group
-// from the coordinator metadata; it is likely that the group has 
migrated to some other
-// coordinator OR the group is in a transient unstable phase. Let the 
member retry
-// finding the correct coordinator and rejoin.
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.COORDINATOR_NOT_AVAILABLE })
-  } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"txn-commit-offsets")) {
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.FENCED_INSTANCE_ID })
-  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && 
!group.has(memberId)) {
-// Enforce member id when it is set.
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.UNKNOWN_MEMBER_ID })
-  } else if (generationId >= 0 && generationId != group.generationId) {
-// Enforce generation check when it is set.
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.ILLEGAL_GENERATION })
+  val validationErrorOpt = validateTxnOffsetCommit(
+group,
+generationId,
+memberId,
+groupInstanceId
+  )
+
+  if (validationErrorOpt.isDefined) {
+responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })

Review comment:
   Yeah, I agree. I think I was trying to avoid a big indent of the block 
below. I found a reasonable way to do it in the latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12363) Simplify static group memberId update logic

2021-02-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12363:

Description: 
In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of 
static members always gets persisted. The way this works is the following:

1. When the JoinGroup is received, we immediately replace the current memberId 
with the updated memberId.
2. We then send an append to the log to update group metadata
3. If the append succeeds, we return the new memberId in the JoinGroup response.
4. If the append fails, we revert to the old memberId and we return 
UNKNOWN_MEMBER_ID in the response for the new member.

I am not sure if there are any correctness problems with this logic, but it 
does seem strange. For example, we can end up fencing the old memberId after 
step 1 even if we end up reverting in step 3. I think it would be simpler to 
structure this as follows:

1. When the JoinGroup is received, send an append to the log to update group 
metadata
2. If the append succeeds, replace the existing memberId with the new committed 
memberId.
3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry.

Basically we don't surface the effect of the member replacement until we know 
it has been committed to the log, which avoids the weird revert logic.

  was:
In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of 
static members always gets persisted. The way this works is the following:

1. When the JoinGroup is received, we immediately replace the current memberId 
with the updated memberId.
2. We then send an append to the log to update group metadata
3. If the append is unsuccessful, we revert to the old memberId and we return 
UNKNOWN_MEMBER_ID in the response for the new member.
4. If the append is successful, we return the new memberId in the JoinGroup 
response.

I am not sure if there are any correctness problems with this logic, but it 
does seem strange. For example, we can end up fencing the old memberId after 
step 1 even if we end up reverting in step 3. I think it would be simpler to 
structure this as follows:

1. When the JoinGroup is received, send an append to the log to update group 
metadata
2. If the append succeeds, replace the existing memberId with the new committed 
memberId.
3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry.

Basically we don't surface the effect of the member replacement until we know 
it has been committed to the log, which avoids the weird revert logic.


> Simplify static group memberId update logic
> ---
>
> Key: KAFKA-12363
> URL: https://issues.apache.org/jira/browse/KAFKA-12363
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of 
> static members always gets persisted. The way this works is the following:
> 1. When the JoinGroup is received, we immediately replace the current 
> memberId with the updated memberId.
> 2. We then send an append to the log to update group metadata
> 3. If the append succeeds, we return the new memberId in the JoinGroup 
> response.
> 4. If the append fails, we revert to the old memberId and we return 
> UNKNOWN_MEMBER_ID in the response for the new member.
> I am not sure if there are any correctness problems with this logic, but it 
> does seem strange. For example, we can end up fencing the old memberId after 
> step 1 even if we end up reverting in step 3. I think it would be simpler to 
> structure this as follows:
> 1. When the JoinGroup is received, send an append to the log to update group 
> metadata
> 2. If the append succeeds, replace the existing memberId with the new 
> committed memberId.
> 3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry.
> Basically we don't surface the effect of the member replacement until we know 
> it has been committed to the log, which avoids the weird revert logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`

2021-02-22 Thread GitBox


hachikuji commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r580672171



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1055,11 +1227,15 @@ class GroupCoordinator(val brokerId: Int,
   val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
   groupManager.storeGroup(group, groupAssignment, error => {
 if (error != Errors.NONE) {
+
+  // TODO: This logic seems questionable. The write was not 
committed, but that doesn't
+  //  mean it wasn't written to the log and cannot eventually 
become committed.

Review comment:
   Done. I filed https://issues.apache.org/jira/browse/KAFKA-12363.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12363) Simplify static group memberId update logic

2021-02-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12363:
---

 Summary: Simplify static group memberId update logic
 Key: KAFKA-12363
 URL: https://issues.apache.org/jira/browse/KAFKA-12363
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


In KAFKA-10284, we amended the JoinGroup logic to ensure that the memberId of 
static members always gets persisted. The way this works is the following:

1. When the JoinGroup is received, we immediately replace the current memberId 
with the updated memberId.
2. We then send an append to the log to update group metadata
3. If the append is unsuccessful, we revert to the old memberId and we return 
UNKNOWN_MEMBER_ID in the response for the new member.
4. If the append is successful, we return the new memberId in the JoinGroup 
response.

I am not sure if there are any correctness problems with this logic, but it 
does seem strange. For example, we can end up fencing the old memberId after 
step 1 even if we end up reverting in step 3. I think it would be simpler to 
structure this as follows:

1. When the JoinGroup is received, send an append to the log to update group 
metadata
2. If the append succeeds, replace the existing memberId with the new committed 
memberId.
3. If the append fails, return UNKNOWN_MEMBER_ID to let the new member retry.

Basically we don't surface the effect of the member replacement until we know 
it has been committed to the log, which avoids the weird revert logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #10181: MINOR: fix a couple of failing system tests

2021-02-22 Thread GitBox


cmccabe merged pull request #10181:
URL: https://github.com/apache/kafka/pull/10181


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10181: MINOR: fix a couple of failing system tests

2021-02-22 Thread GitBox


rondagostino commented on pull request #10181:
URL: https://github.com/apache/kafka/pull/10181#issuecomment-783734434


   @cmccabe Can you take a look?  I found these issues after the test run on 
the 2.8 PR branch completed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10181: MINOR: fix a couple of failing system tests

2021-02-22 Thread GitBox


rondagostino opened a new pull request #10181:
URL: https://github.com/apache/kafka/pull/10181


   This patch fixes a couple of failing system tests due to 
https://github.com/apache/kafka/pull/10105/ and should be merged to both 
`trunk` and `2.8`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12362) Determine if a Task is idling

2021-02-22 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-12362:
--

 Summary: Determine if a Task is idling
 Key: KAFKA-12362
 URL: https://issues.apache.org/jira/browse/KAFKA-12362
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Walker Carlson
 Fix For: 3.0.0


determine if a task is idling given the task Id.

 

https://github.com/apache/kafka/pull/10180



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 opened a new pull request #10180: KAFKA- 12347: expose offsets to streams client

2021-02-22 Thread GitBox


wcarlson5 opened a new pull request #10180:
URL: https://github.com/apache/kafka/pull/10180


   collect the offsets after they are committed
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10179: MINOR: tune KIP-631 configurations

2021-02-22 Thread GitBox


ijuma edited a comment on pull request #10179:
URL: https://github.com/apache/kafka/pull/10179#issuecomment-783724833


   Can we add a comment to the code explaining why these timeouts are lower 
than the ZK equivalent ones (similar to what's in the PR description). May even 
be worth including it in the documentation string (people will wonder).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10179: MINOR: tune KIP-631 configurations

2021-02-22 Thread GitBox


ijuma commented on pull request #10179:
URL: https://github.com/apache/kafka/pull/10179#issuecomment-783724833


   Can we add a comment to the code explaining why these timeouts are lower 
than the ZK equivalent ones (similar to what's in the PR description).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10179: MINOR: tune KIP-631 configurations

2021-02-22 Thread GitBox


cmccabe opened a new pull request #10179:
URL: https://github.com/apache/kafka/pull/10179


   Since we expect KIP-631 controller failovers to be fairly cheap, tune
   the default raft configuration parameters so that we detect node
   failures more quickly.
   
   Reduce the broker session timeout as well so that broker failures are
   detected more quickly.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`

2021-02-22 Thread GitBox


hachikuji commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r580635693



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
  offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
  responseCallback: 
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
 group.inLock {
-  if (group.is(Dead)) {
-// if the group is marked as dead, it means some other thread has just 
removed the group
-// from the coordinator metadata; it is likely that the group has 
migrated to some other
-// coordinator OR the group is in a transient unstable phase. Let the 
member retry
-// finding the correct coordinator and rejoin.
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.COORDINATOR_NOT_AVAILABLE })
-  } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"txn-commit-offsets")) {
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.FENCED_INSTANCE_ID })
-  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && 
!group.has(memberId)) {
-// Enforce member id when it is set.
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.UNKNOWN_MEMBER_ID })
-  } else if (generationId >= 0 && generationId != group.generationId) {
-// Enforce generation check when it is set.
-responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.ILLEGAL_GENERATION })
+  val validationErrorOpt = validateTxnOffsetCommit(
+group,
+generationId,
+memberId,
+groupInstanceId
+  )
+
+  if (validationErrorOpt.isDefined) {
+responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })

Review comment:
   Yeah, I agree. I think I was trying to avoid a big indent. I think I 
found a reasonable way to do it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #10175: MINOR: V2.8 system tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe closed pull request #10175:
URL: https://github.com/apache/kafka/pull/10175


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10175: MINOR: V2.8 system tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe commented on pull request #10175:
URL: https://github.com/apache/kafka/pull/10175#issuecomment-783709474


   thanks, @rondagostino! I backported this as part of the #10105 PR as we 
usually do (we don't usually have a separate PR for the branch).  Thanks for 
showing me how to resolve the conflicts in `get_offset_shell_test.py`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe merged pull request #10105:
URL: https://github.com/apache/kafka/pull/10105


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12242) Decouple state store materialization enforcement from name/serde provider

2021-02-22 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288646#comment-17288646
 ] 

John Roesler commented on KAFKA-12242:
--

Thanks for raising this, [~guozhang] , I have seen several people getting 
tripped up by this issue.

 

Just to throw it out there, another approach to solve this would be to begin to 
go down the road I proposed here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar]

Specifically, what I have in mind is that attaching serdes to a processor node 
is logically independent from requesting materialization of that node's view. 
This ticket highlights the awkwardness of using a cross-cutting "Materialized" 
config at all. In contrast, if it were just "MapValuesParameters" with 
independent settings for specifying serdes and for requesting materialization, 
it wouldn't be an issue.

I definitely don't insist on the proposed grammar, but wanted to document the 
relationship with that proposal.

> Decouple state store materialization enforcement from name/serde provider
> -
>
> Key: KAFKA-12242
> URL: https://issues.apache.org/jira/browse/KAFKA-12242
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Many users of Streams would want the following: let the Streams runtime to 
> decide whether or not to materialize a state store; AND if it decides to do 
> so, use the store name / serdes I provided ahead of time, if not, then 
> nothing happens (the provided store name and serdes can just be dropped).
> However, Streams today take `Materialized` as an indicator to enforce the 
> materialization. We should think of a way for users to optionally decouple 
> materialization enforcement from name/serde provider.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9912: MINOR: Let TestRaftRequestHandler use RequestHandlerHelper

2021-02-22 Thread GitBox


hachikuji commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r580606013



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -2127,21 +2118,15 @@ class KafkaApisTest {
   error = Errors.NONE
 ))
 
-val response = readResponse(joinGroupRequest, capturedResponse)
-  .asInstanceOf[JoinGroupResponse]
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
 
 assertEquals(Errors.NONE, response.error)
 assertEquals(0, response.data.members.size)
 assertEquals(memberId, response.data.memberId)
 assertEquals(0, response.data.generationId)
 assertEquals(memberId, response.data.leader)
 assertEquals(protocolName, response.data.protocolName)
-
-if (version >= 7) {

Review comment:
   Now these test cases are working with the raw response object returned 
by `KafkaApis`. This means we don't have to deal with fields which are dropped 
during serialization.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12347) Improve Kafka Streams ability to track progress

2021-02-22 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-12347:
---
Description: 
Add methods to track records being consumed fully and to tell if tasks are 
idling. This will allow users of streams to build uptime metrics around streams 
with less difficulty.

KIP-715: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams]

  was:
Add metrics to track records being consumed fully and to tell if tasks are 
idling. This will allow users of streams to build uptime metrics around streams 
with less difficulty.

KIP-715: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams
 


> Improve Kafka Streams ability to track progress
> ---
>
> Key: KAFKA-12347
> URL: https://issues.apache.org/jira/browse/KAFKA-12347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip
> Fix For: 3.0.0
>
>
> Add methods to track records being consumed fully and to tell if tasks are 
> idling. This will allow users of streams to build uptime metrics around 
> streams with less difficulty.
> KIP-715: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-22 Thread GitBox


rhauch commented on a change in pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#discussion_r580584221



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
##
@@ -364,11 +365,24 @@ private static String castToString(Object value) {
 if (value instanceof java.util.Date) {
 java.util.Date dateValue = (java.util.Date) value;
 return Values.dateFormatFor(dateValue).format(dateValue);
+} else if (value instanceof ByteBuffer) {
+ByteBuffer byteBuffer = (ByteBuffer) value;
+return castByteArrayToString(byteBuffer.array());
+} else if (value instanceof byte[]) {
+return castByteArrayToString((byte[]) value);
 } else {
 return value.toString();
 }
 }
 
+private static String castByteArrayToString(byte[] array) {
+StringBuilder sbuf = new StringBuilder();
+for (byte b : array) {
+sbuf.append(String.format("%02X", b));

Review comment:
   As noted in my previous comment, I agree with @kkonstantine that base64 
would be preferable. Doing that would align better with the existing `Values` 
class used in Connect's header converter mechanism.
   
   If we want to add support for multiple encodings, we would need to have a 
KIP since it would likely mean changing the SMT configuration. 

##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
##
@@ -364,11 +365,31 @@ private static String castToString(Object value) {
 if (value instanceof java.util.Date) {
 java.util.Date dateValue = (java.util.Date) value;
 return Values.dateFormatFor(dateValue).format(dateValue);
+} else if (value instanceof ByteBuffer) {
+ByteBuffer byteBuffer = (ByteBuffer) value;
+if (byteBuffer.hasArray()) {
+return castByteArrayToString(byteBuffer.array());
+}
+else {
+byte[] array = new byte[byteBuffer.remaining()];
+byteBuffer.get(array);
+return castByteArrayToString(array);
+}

Review comment:
   The use of `ByteBuffer.get(...)` here does not account for the fact that 
it may not be positioned at the beginning. Kafka has two `Utils.readBytes(...)` 
methods that we should probably use. This code would then simplify to:
   ```suggestion
   byte[] rawBytes = Utils.readBytes(byteBuffer);
   return castByteArrayToString(rawBytes);
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10178: KAFKA-12361: Use default request.timeout.ms value for Connect producers

2021-02-22 Thread GitBox


C0urante commented on pull request #10178:
URL: https://github.com/apache/kafka/pull/10178#issuecomment-783669441


   @hachikuji Does this look alright to you? The changes are trivial but I'm 
also hoping my summary of the situation in the description is correct and won't 
mislead anyone.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #10178: KAFKA-12361: Use default request.timeout.ms value for Connect producers

2021-02-22 Thread GitBox


C0urante opened a new pull request #10178:
URL: https://github.com/apache/kafka/pull/10178


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12361)
   
   The super-high request timeout makes it harder for the producer to 
gracefully handle unclean connection terminations, which might happen in the 
case of sudden broker death.
   
   Reducing that value to the default of 30 seconds should address that issue, 
without compromising the existing delivery guarantees of the Connect framework. 
Since the delivery timeout is still set to a very-high value, this change 
shouldn't make it more likely for `Producer::send` to throw an exception and 
fail the task.
   
   This may make duplicate record delivery more likely in cases with 
extremely-slow broker response time, but that can be addressed by enabling 
idempotence in the underlying producers for the connector's tasks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-22 Thread GitBox


rhauch edited a comment on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-783634110


   Before this fix, the changes made in #4820 
([KAFKA-6684](https://issues.apache.org/jira/browse/KAFKA-6684)) resulted in 
`toString()` being called on `byte[]` and `ByteBuffer`. As highlighted in this 
PR and issue that `ByteBuffer.toString()` is not useful, but the `toString()` 
on `byte[]` still works. This PR seems to change that behavior, which would not 
be backward compatible.
   
   The discussion on PR #4820 also talked about making this compatible with 
`Values.convertToString(...)`, which for `byte[]` and `ByteBuffer` results in a 
base 64 encoded string (with `ISO-8859-1` encoding). See the [Values code for 
details](https://github.com/apache/kafka/blob/95f51539c8d0b88bd7f285011d42e2d1117107de/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L673-L682).
   
   ~Because of this, it seems much more sensible to also use base64 here for 
`ByteBuffer` so it matches the existing behavior with `byte[]`.~
   
   Actually, the existing code simply outputs the `toString()` result of a 
`byte[]` object, which is of the form `[B@22ef9844`, which is the 
`Object.toString()` implementation that includes the class alias (e.g., `[B` is 
a byte array) and the hex representation of the object's `hashCode()`.
   
   However, given that none of the cast forms of `byte[]` or `ByteBuffer[]` are 
useful, then perhaps it's worth adding the previous discussion on #4820 
mentioning that:
   > As @ewencp suggested, for consistency perhaps we can use the same string 
formats as SimpleHeaderConverter: 
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java,
   
   As mentioned above, the `Values.convertToString(...)` uses a base 64 
encoding. And this aligns with @kkonstantine's suggestion:
   > Why are we selecting hex here and not base64 for example? Hex of course is 
less efficient.
   
   I agree that it maybe doesn't suffice in all user situations, so if we also 
want to support other encodings we'd need other config changes that will 
require using the KIP mechanism to propose such enhancements.
   
   WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-22 Thread GitBox


rhauch edited a comment on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-783634110


   Before this fix, the changes made in #4820 
([KAFKA-6684](https://issues.apache.org/jira/browse/KAFKA-6684)) resulted in 
`toString()` being called on `byte[]` and `ByteBuffer`. As highlighted in this 
PR and issue that `ByteBuffer.toString()` is not useful, but the `toString()` 
on `byte[]` still works. This PR seems to change that behavior, which would not 
be backward compatible.
   
   The discussion on PR #4820 also talked about making this compatible with 
`Values.convertToString(...)`, which for `byte[]` and `ByteBuffer` results in a 
base 64 encoded string (with `ISO-8859-1` encoding). See the [Values code for 
details](https://github.com/apache/kafka/blob/95f51539c8d0b88bd7f285011d42e2d1117107de/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L673-L682).
   
   ~Because of this, it seems much more sensible to also use base64 here for 
`ByteBuffer` so it matches the existing behavior with `byte[]`.~
   
   ~I agree that it maybe doesn't suffice in all user situations, so if we also 
want to support other encodings we'd need other config changes that will 
require using the KIP mechanism to propose such enhancements.~
   
   Actually, the existing code simply outputs the `toString()` result of a 
`byte[]` object, which is of the form `[B@22ef9844`, which is the 
`Object.toString()` implementation that includes the class alias (e.g., `[B` is 
a byte array) and the hex representation of the object's `hashCode()`.
   
   However, given that none of the cast forms of `byte[]` or `ByteBuffer[]` are 
useful, then perhaps it's worth adding the previous discussion on #4820 
mentioning that:
   > As @ewencp suggested, for consistency perhaps we can use the same string 
formats as SimpleHeaderConverter: 
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java,
   
   As mentioned above, the `Values.convertToString(...)` uses a base 64 
encoding. And this aligns with @kkonstantine's suggestion:
   > Why are we selecting hex here and not base64 for example? Hex of course is 
less efficient.
   
   WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12361) Change default connect producer request timeout

2021-02-22 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-12361:
-

Assignee: Chris Egerton

> Change default connect producer request timeout
> ---
>
> Key: KAFKA-12361
> URL: https://issues.apache.org/jira/browse/KAFKA-12361
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Chris Egerton
>Priority: Major
>  Labels: connect
>
> Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way 
> to ensure that records sent through the producer would even have the 
> opportunity to get delivered. Records could be timed out in the accumulator 
> if `request.timeout.ms` was reached before getting sent. Users worked around 
> this problem by setting `request.timeout.ms=Int.MaxValue`. The downside is 
> that this made the producer slower to discover "unclean" connection failures. 
> Now that we have KIP-91, there shouldn't be any reason to keep this 
> workaround. 
> One place it would be good to fix this is in connect's source tasks:
> {code}
> // These settings will execute infinite retries on retriable 
> exceptions. They *may* be overridden via configs passed to the worker,
> // but this may compromise the delivery guarantees of Kafka Connect.
> producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> Integer.toString(Integer.MAX_VALUE));
> {code}
> The comment about delivery guarantees is a little vague, but I think mainly 
> it is what was discussed above about ensuring at least once delivery. Note 
> that none of the default configs including both request timeout and delivery 
> timeout can avoid duplicates in all cases. For that idempotence is needed. It 
> is worth considering separately for connect whether that should be the 
> default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


rondagostino commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580575104



##
File path: tests/kafkatest/services/performance/end_to_end_latency.py
##
@@ -74,12 +74,15 @@ def __init__(self, context, num_nodes, kafka, topic, 
num_records, compression_ty
 def start_cmd(self, node):
 args = self.args.copy()
 args.update({
-'zk_connect': self.kafka.zk_connect_setting(),
 'bootstrap_servers': 
self.kafka.bootstrap_servers(self.security_config.security_protocol),
 'config_file': EndToEndLatencyService.CONFIG_FILE,
 'kafka_run_class': self.path.script("kafka-run-class.sh", node),
 'java_class_name': self.java_class_name()
 })
+if node.version < V_0_9_0_0:

Review comment:
   > name it something like consumer_supports_bootstrap_server... bunch of 
other bootstrap server functions
   
   Good point.  I renamed it `consumer_supports_bootstrap_server()` as you 
suggested.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580571155



##
File path: tests/kafkatest/tests/core/round_trip_fault_test.py
##
@@ -47,32 +55,48 @@ def __init__(self, test_context):
  active_topics=active_topics)
 
 def setUp(self):
-self.zk.start()
+if self.zk:
+self.zk.start()
 self.kafka.start()
 self.trogdor.start()
 
 def teardown(self):
 self.trogdor.stop()
 self.kafka.stop()
-self.zk.stop()
+if self.zk:
+self.zk.stop()
 
-def test_round_trip_workload(self):
+def remote_quorum_nodes(self):
+if quorum.for_test(self.test_context) == quorum.zk:
+return self.zk.nodes
+elif quorum.for_test(self.test_context) == quorum.remote_raft:
+return self.kafka.controller_quorum.nodes
+else: # co-located case, which we currently don't test but handle here 
for completeness in case we do test it
+return []

Review comment:
   sounds good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580570954



##
File path: tests/kafkatest/services/performance/end_to_end_latency.py
##
@@ -74,12 +74,15 @@ def __init__(self, context, num_nodes, kafka, topic, 
num_records, compression_ty
 def start_cmd(self, node):
 args = self.args.copy()
 args.update({
-'zk_connect': self.kafka.zk_connect_setting(),
 'bootstrap_servers': 
self.kafka.bootstrap_servers(self.security_config.security_protocol),
 'config_file': EndToEndLatencyService.CONFIG_FILE,
 'kafka_run_class': self.path.script("kafka-run-class.sh", node),
 'java_class_name': self.java_class_name()
 })
+if node.version < V_0_9_0_0:

Review comment:
   thanks.  that sounds good!
   
   can you name it something like `consumer_supports_bootstrap_server`?  There 
are a bunch of other bootstrap server functions 
(`acl_command_supports_bootstrap_server`, 
`topic_command_supports_bootstrap_server`, etc.) so it would be good to be clear





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-22 Thread GitBox


rhauch commented on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-783634110


   Before this fix, the changes made in #4820 
([KAFKA-6684](https://issues.apache.org/jira/browse/KAFKA-6684)) resulted in 
`toString()` being called on `byte[]` and `ByteBuffer`. As highlighted in this 
PR and issue that `ByteBuffer.toString()` is not useful, but the `toString()` 
on `byte[]` still works. This PR seems to change that behavior, which would not 
be backward compatible.
   
   The discussion on PR #4820 also talked about making this compatible with 
`Values.convertToString(...)`, which for `byte[]` and `ByteBuffer` results in a 
base 64 encoded string (with `ISO-8859-1` encoding). See the [Values code for 
details](https://github.com/apache/kafka/blob/95f51539c8d0b88bd7f285011d42e2d1117107de/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L673-L682).
   
   Because of this, it seems much more sensible to also use base64 here for 
`ByteBuffer` so it matches the existing behavior with `byte[]`. 
   
   I agree that it maybe doesn't suffice in all user situations, so if we also 
want to support other encodings we'd need other config changes that will 
require using the KIP mechanism to propose such enhancements.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate

2021-02-22 Thread GitBox


mjsax commented on a change in pull request #10170:
URL: https://github.com/apache/kafka/pull/10170#discussion_r580546901



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1874,6 +1873,66 @@ public void close() {}
 assertEquals(2, punctuatedWallClockTime.size());
 }
 
+@Test
+public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
+final org.apache.kafka.streams.kstream.TransformerSupplier> punctuateProcessor =
+() -> new org.apache.kafka.streams.kstream.Transformer>() {
+@Override
+public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+context.schedule(Duration.ofMillis(100L), 
PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value"));
+}
+
+@Override
+public KeyValue transform(final Object key, 
final Object value) {
+return null;
+}
+
+@Override
+public void close() {}
+};
+
+final List peekedContextTime = new ArrayList<>();
+final org.apache.kafka.streams.processor.ProcessorSupplier peekProcessor =
+() -> new 
org.apache.kafka.streams.processor.AbstractProcessor() {
+@Override
+public void process(final Object key, final Object value) {
+peekedContextTime.add(context.timestamp());
+}
+};
+
+internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+.transform(punctuateProcessor)
+.process(peekProcessor);
+internalStreamsBuilder.buildAndOptimizeTopology();
+
+final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+
+thread.setState(StreamThread.State.STARTING);
+thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+final List assignedPartitions = new ArrayList<>();
+
+final Map> activeTasks = new HashMap<>();
+
+// assign single partition
+assignedPartitions.add(t1p1);
+activeTasks.put(task1, Collections.singleton(t1p1));
+
+thread.taskManager().handleAssignment(activeTasks, emptyMap());
+
+clientSupplier.consumer.assign(assignedPartitions);
+
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
+
+thread.runOnce();
+assertEquals(0, peekedContextTime.size());
+
+mockTime.sleep(100L);
+thread.runOnce();
+
+assertEquals(1, peekedContextTime.size());
+assertNotNull(peekedContextTime.get(0));

Review comment:
   We should verify the actual timestamp.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate

2021-02-22 Thread GitBox


mjsax commented on a change in pull request #10170:
URL: https://github.com/apache/kafka/pull/10170#discussion_r580546373



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1874,6 +1873,66 @@ public void close() {}
 assertEquals(2, punctuatedWallClockTime.size());
 }
 
+@Test
+public void shouldPunctuateWithTimestampPreservedInProcessorContext() {

Review comment:
   I think we should add one more test for `STREAM_TIME` punctuation?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate

2021-02-22 Thread GitBox


mjsax commented on a change in pull request #10170:
URL: https://github.com/apache/kafka/pull/10170#discussion_r580544814



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -792,7 +790,16 @@ public void punctuate(final ProcessorNode node,
 throw new IllegalStateException(String.format("%sCurrent node is 
not null", logPrefix));
 }
 
-updateProcessorContext(node, time.milliseconds(), null);
+// when punctuating, we need to preserve the timestamp (this can be 
either system time or event time)
+// while other record context are set as dummy: null topic, -1 
partition, -1 offset and empty header
+final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+timestamp,
+-1L,
+-1,
+null,
+new RecordHeaders()

Review comment:
   Why not pass `null` ? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12323) Record timestamps not populated in event

2021-02-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-12323:
---

Assignee: Guozhang Wang

> Record timestamps not populated in event
> 
>
> Key: KAFKA-12323
> URL: https://issues.apache.org/jira/browse/KAFKA-12323
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Adam Bellemare
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1
>
> Attachments: PunctuateTimestampZeroTest.java
>
>
> Upgraded a kafka streams application from 2.6.0 to 2.7.0. Noticed that the 
> events being produced had a "CreatedAt" timestamp = 0, causing downstream 
> failures as we depend on those timestamps. Reverting back to 2.6.0/2.6.1 
> fixed this issue. This was the only change to the Kafka Streams application.
> Consuming the event stream produced by 2.6.0 results in events that, when 
> consumed using the `kafka-avro-console-consumer` and `--property 
> print.timestamp=true` result in events prepended with the event times, such 
> as:
> {code:java}
> CreateTime:1613072202271  
> CreateTime:1613072203412  
> CreateTime:1613072205431  
> {code}
> etc.
> However, when those events are produced by the Kafka Streams app using 2.7.0, 
> we get:
> {code:java}
> CreateTime:0  
> CreateTime:0  
> CreateTime:0   
> {code}
> I don't know if these is a default value somewhere that changed, but this is 
> actually a blocker for our use-cases as we now need to circumnavigate this 
> limitation (or roll back to 2.6.1, though there are other issues we must deal 
> with then). I am not sure which unit tests in the code base to look at to 
> validate this, but I wanted to log this bug now in case someone else has 
> already seen this or an open one exists (I didn't see one though).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12360) Improve documentation of max.task.idle.ms (kafka-streams)

2021-02-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12360:

Labels: beginner newbie trivial  (was: kafka-streams)

> Improve documentation of max.task.idle.ms (kafka-streams)
> -
>
> Key: KAFKA-12360
> URL: https://issues.apache.org/jira/browse/KAFKA-12360
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, streams
>Reporter: Domenico Delle Side
>Priority: Minor
>  Labels: beginner, newbie, trivial
>
> _max.task.idle.ms_ is an handy way to pause processing in a *_kafka-streams_* 
> application. This is very useful when you need to join two topics that are 
> out of sync, i.e when data in a topic may be produced _before_ you receive 
> join information in the other topic.
> In the documentation, however, it is not specified that the value of 
> _max.task.idle.ms_ *must* be lower than _max.poll.intervall.ms_, otherwise 
> you'll incur into an endless rebalancing problem.
> I think it is better to clearly state this in the documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


rondagostino commented on pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#issuecomment-783624846


   > I wonder if we should disallow tests without `[the @cluster]` annotation 
in the future.
   
   I think perhaps yes.  Is there a need to allow it?  A simple oversight 
generates a significant parallelism hit at this point -- and they compound 
quickly as evidenced here.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


rondagostino commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580532054



##
File path: tests/kafkatest/tests/core/round_trip_fault_test.py
##
@@ -47,32 +55,48 @@ def __init__(self, test_context):
  active_topics=active_topics)
 
 def setUp(self):
-self.zk.start()
+if self.zk:
+self.zk.start()
 self.kafka.start()
 self.trogdor.start()
 
 def teardown(self):
 self.trogdor.stop()
 self.kafka.stop()
-self.zk.stop()
+if self.zk:
+self.zk.stop()
 
-def test_round_trip_workload(self):
+def remote_quorum_nodes(self):
+if quorum.for_test(self.test_context) == quorum.zk:
+return self.zk.nodes
+elif quorum.for_test(self.test_context) == quorum.remote_raft:
+return self.kafka.controller_quorum.nodes
+else: # co-located case, which we currently don't test but handle here 
for completeness in case we do test it
+return []

Review comment:
   > throw an exception or does the current code actually work for this 
case?
   
   The code always needs `` + ``, where the 
latter are the ZooKeeper or remote Controller Quorum nodes.  If we were to run 
this test with co-located Raft Quorum controllers then those nodes would be 
accounted for because they are part of the Kafka nodes, so there is no need to 
explicitly add them.  So this code is correct in that it return an empty list 
for that case.  As was indicated in the comment, it's here just in case we ever 
decide we want to test it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


rondagostino commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580532054



##
File path: tests/kafkatest/tests/core/round_trip_fault_test.py
##
@@ -47,32 +55,48 @@ def __init__(self, test_context):
  active_topics=active_topics)
 
 def setUp(self):
-self.zk.start()
+if self.zk:
+self.zk.start()
 self.kafka.start()
 self.trogdor.start()
 
 def teardown(self):
 self.trogdor.stop()
 self.kafka.stop()
-self.zk.stop()
+if self.zk:
+self.zk.stop()
 
-def test_round_trip_workload(self):
+def remote_quorum_nodes(self):
+if quorum.for_test(self.test_context) == quorum.zk:
+return self.zk.nodes
+elif quorum.for_test(self.test_context) == quorum.remote_raft:
+return self.kafka.controller_quorum.nodes
+else: # co-located case, which we currently don't test but handle here 
for completeness in case we do test it
+return []

Review comment:
   > throw an exception or does the current code actually work for this 
case?
   
   The code always needs  + , where the 
latter are the ZooKeeper or remote Controller Quorum nodes.  If we were to run 
this test with co-located Raft Quorum controllers then those nodes would be 
accounted for because they are part of the Kafka nodes, so there is no need to 
explicitly add them.  So this code is correct in that it return an empty list 
for that case.  As was indicated in the comment, it's here just in case we ever 
decide we want to test it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


rondagostino commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580527682



##
File path: tests/kafkatest/services/performance/end_to_end_latency.py
##
@@ -74,12 +74,15 @@ def __init__(self, context, num_nodes, kafka, topic, 
num_records, compression_ty
 def start_cmd(self, node):
 args = self.args.copy()
 args.update({
-'zk_connect': self.kafka.zk_connect_setting(),
 'bootstrap_servers': 
self.kafka.bootstrap_servers(self.security_config.security_protocol),
 'config_file': EndToEndLatencyService.CONFIG_FILE,
 'kafka_run_class': self.path.script("kafka-run-class.sh", node),
 'java_class_name': self.java_class_name()
 })
+if node.version < V_0_9_0_0:

Review comment:
   > would be good to have a function in version.py to reflect 
[functionality unsupported below 0.9.0]
   
   I'll add this function:
   
   ```
   def supports_bootstrap_server(self):
   return self >= V_0_9_0_0
   ```
   
   I then made the changes all over since this 0.9.0 constant is checked in 
several places:
   
   ```
   tests/kafkatest/services/console_consumer.py
   tests/kafkatest/services/performance/consumer_performance.py
   tests/kafkatest/services/performance/end_to_end_latency.py
   tests/kafkatest/services/performance/producer_performance.py
   tests/kafkatest/tests/core/upgrade_test.py
   tests/kafkatest/tests/core/upgrade_test.py
   ```
   
   We can revert if you think I went too far.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12361) Change default connect producer request timeout

2021-02-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12361:

Description: 
Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way to 
ensure that records sent through the producer would even have the opportunity 
to get delivered. Records could be timed out in the accumulator if 
`request.timeout.ms` was reached before getting sent. Users worked around this 
problem by setting `request.timeout.ms=Int.MaxValue`. The downside is that this 
made the producer slower to discover "unclean" connection failures. Now that we 
have KIP-91, there shouldn't be any reason to keep this workaround. 

One place it would be good to fix this is in connect's source tasks:
{code}
// These settings will execute infinite retries on retriable 
exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
{code}
The comment about delivery guarantees is a little vague, but I think mainly it 
is what was discussed above about ensuring at least once delivery. Note that 
none of the default configs including both request timeout and delivery timeout 
can avoid duplicates in all cases. For that idempotence is needed. It is worth 
considering separately for connect whether that should be the default.



  was:
Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way to 
ensure that records sent through the producer would even have the opportunity 
to get delivered. Records could be timed out in the accumulator if 
`request.timeout.ms` was reached before getting sent. Users worked around this 
problem by setting `request.timeout.ms=Int.MaxValue`. The downside is that this 
made the producer slower to discover "unclean" connection failures. Now that we 
have KIP-91, there shouldn't be any reason to keep this workaround. 

One place it would be good to fix this is in connect's source tasks:
{code}
// These settings will execute infinite retries on retriable 
exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
{code}
The comment about delivery guarantees is a little vague, but I think mainly it 
is what was discussed above about ensuring at least once delivery. Note that 
none of the default configs including request timeout or delivery timeout can 
avoid duplicates in all cases. For that idempotence is needed. It is worth 
considering separately for connect whether that should be the default.




> Change default connect producer request timeout
> ---
>
> Key: KAFKA-12361
> URL: https://issues.apache.org/jira/browse/KAFKA-12361
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: connect
>
> Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way 
> to ensure that records sent through the producer would even have the 
> opportunity to get delivered. Records could be timed out in the accumulator 
> if `request.timeout.ms` was reached before getting sent. Users worked around 
> this problem by setting `request.timeout.ms=Int.MaxValue`. The downside is 
> that this made the producer slower to discover "unclean" connection failures. 
> Now that we have KIP-91, there shouldn't be any reason to keep this 
> workaround. 
> One place it would be good to fix this is in connect's source tasks:
> {code}
> // These settings will execute infinite retries on retriable 
> exceptions. They *may* be overridden via configs passed to the worker,
> // but this may compromise the delivery guarantees of Kafka Connect.
> producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> Integer.toString(Integer.MAX_VALUE));
> {code}
> The comment about delivery guarantees is a little vague, but I think mainly 
> it is what was discussed above about ensuring at least once delivery. Note 
> that none of the default configs including both request timeout and delivery 
> timeout can avoid duplicates in all cases. For that idempotence is needed. It 
> is worth considering separately for connect whether that should be the 
> default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12361) Change default connect producer request timeout

2021-02-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12361:
---

 Summary: Change default connect producer request timeout
 Key: KAFKA-12361
 URL: https://issues.apache.org/jira/browse/KAFKA-12361
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Prior to KIP-91, which introduced delivery.timeout.ms, there was no easy way to 
ensure that records sent through the producer would even have the opportunity 
to get delivered. Records could be timed out in the accumulator if 
`request.timeout.ms` was reached before getting sent. Users worked around this 
problem by setting `request.timeout.ms=Int.MaxValue`. The downside is that this 
made the producer slower to discover "unclean" connection failures. Now that we 
have KIP-91, there shouldn't be any reason to keep this workaround. 

One place it would be good to fix this is in connect's source tasks:
{code}
// These settings will execute infinite retries on retriable 
exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
{code}
The comment about delivery guarantees is a little vague, but I think mainly it 
is what was discussed above about ensuring at least once delivery. Note that 
none of the default configs including request timeout or delivery timeout can 
avoid duplicates in all cases. For that idempotence is needed. It is worth 
considering separately for connect whether that should be the default.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe commented on pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#issuecomment-783579134


   Good work spotting the missing `@cluster` annotations! That is quite a 
performance win. I wonder if we should disallow tests without this annotation 
in the future.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580489004



##
File path: tests/kafkatest/tests/core/round_trip_fault_test.py
##
@@ -47,32 +55,48 @@ def __init__(self, test_context):
  active_topics=active_topics)
 
 def setUp(self):
-self.zk.start()
+if self.zk:
+self.zk.start()
 self.kafka.start()
 self.trogdor.start()
 
 def teardown(self):
 self.trogdor.stop()
 self.kafka.stop()
-self.zk.stop()
+if self.zk:
+self.zk.stop()
 
-def test_round_trip_workload(self):
+def remote_quorum_nodes(self):
+if quorum.for_test(self.test_context) == quorum.zk:
+return self.zk.nodes
+elif quorum.for_test(self.test_context) == quorum.remote_raft:
+return self.kafka.controller_quorum.nodes
+else: # co-located case, which we currently don't test but handle here 
for completeness in case we do test it
+return []

Review comment:
   should this throw an exception? Or does the current code actually work 
for this case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580486218



##
File path: 
tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
##
@@ -46,13 +47,16 @@ def __init__(self, test_context):
 self.num_consumers = 1
 
 def setUp(self):
-self.zk.start()
+if self.zk:
+self.zk.start()
 
 def min_cluster_size(self):
 # Override this since we're adding services outside of the constructor
 return super(ClientCompatibilityProduceConsumeTest, 
self).min_cluster_size() + self.num_producers + self.num_consumers
 
-@parametrize(broker_version=str(DEV_BRANCH))
+@cluster(num_nodes=9)
+@matrix(broker_version=[str(DEV_BRANCH)], 
metadata_quorum=quorum.all_non_upgrade)
+@cluster(num_nodes=9)

Review comment:
   `@cluster` is repeated again





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10105: MINOR: System tests for Raft-based metadata quorums

2021-02-22 Thread GitBox


cmccabe commented on a change in pull request #10105:
URL: https://github.com/apache/kafka/pull/10105#discussion_r580485952



##
File path: tests/kafkatest/tests/client/client_compatibility_features_test.py
##
@@ -120,10 +123,9 @@ def invoke_compatibility_program(self, features):
 @parametrize(broker_version=str(LATEST_2_3))
 @parametrize(broker_version=str(LATEST_2_4))
 @parametrize(broker_version=str(LATEST_2_5))
-@parametrize(broker_version=str(LATEST_2_6))
-@parametrize(broker_version=str(LATEST_2_7))

Review comment:
   hmm.  we don't want to drop 2.6 and 2.7 here, do we?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >