[GitHub] [kafka] guozhangwang commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts

2020-06-15 Thread GitBox


guozhangwang commented on pull request #8646:
URL: https://github.com/apache/kafka/pull/8646#issuecomment-644547455


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts

2020-06-15 Thread GitBox


guozhangwang commented on pull request #8646:
URL: https://github.com/apache/kafka/pull/8646#issuecomment-644547384


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts

2020-06-15 Thread GitBox


showuon edited a comment on pull request #8646:
URL: https://github.com/apache/kafka/pull/8646#issuecomment-644543006


   **JDK 8 and Scala 2.12**
   failed kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853
   
   **JDK 14 and Scala 2.13** 
   failed 
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
   --> passed locally
   kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853
   
   **JDK 11 and Scala 2.13**
   
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   --> passed locally
   kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   --> traced in KAFKA-8460 KAFKA-8264
   kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts

2020-06-15 Thread GitBox


showuon commented on pull request #8646:
URL: https://github.com/apache/kafka/pull/8646#issuecomment-644543006


   **JDK 8 and Scala 2.12**
   failed kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853
   
   **JDK 14 and Scala 2.13** 
   failed 
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
   --> passed locally
   kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853
   
   **JDK 11 and Scala 2.13**
   
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   --> passed locally
   kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   --> traced in KAFKA-8460.
   kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles
   --> traced in KAFKA-10155, PR: https://github.com/apache/kafka/pull/8853



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8839: MINOR: Documentation for KIP-585

2020-06-15 Thread GitBox


kkonstantine commented on pull request #8839:
URL: https://github.com/apache/kafka/pull/8839#issuecomment-644537352


   Merged to `trunk` and `2.6`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8859: MINOR: Upgrading jersey and jetty versions

2020-06-15 Thread GitBox


kkonstantine commented on pull request #8859:
URL: https://github.com/apache/kafka/pull/8859#issuecomment-644535669


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8859: MINOR: Upgrading jersey and jetty versions

2020-06-15 Thread GitBox


kkonstantine commented on pull request #8859:
URL: https://github.com/apache/kafka/pull/8859#issuecomment-644535534


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #8839: MINOR: Documentation for KIP-585

2020-06-15 Thread GitBox


kkonstantine merged pull request #8839:
URL: https://github.com/apache/kafka/pull/8839


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection

2020-06-15 Thread GitBox


chia7712 commented on pull request #8853:
URL: https://github.com/apache/kafka/pull/8853#issuecomment-644530767


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10168) Rename public StreamsConfig variable

2020-06-15 Thread sats (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136298#comment-17136298
 ] 

sats commented on KAFKA-10168:
--

Thanks [~bchen225242] for the update.

> Rename public StreamsConfig variable
> 
>
> Key: KAFKA-10168
> URL: https://issues.apache.org/jira/browse/KAFKA-10168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>  Labels: kip
>
> All Kafka Streams configuration parameter are exposed via public variables 
> that all end with `_CONFIG` suffix. However, we added the variable of 
> `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
> `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.
> KIP-626: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-15 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r440585314



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -311,37 +317,47 @@ class GroupMetadataManager(brokerId: Int,
 
   responseCallback(responseError)
 }
-appendForGroup(group, groupMetadataRecords, putCacheCallback)
-
+appendForGroup(group, groupMetadataRecords, putCacheCallback, 
completeDelayedRequests)
   case None =>
 responseCallback(Errors.NOT_COORDINATOR)
-None
+Map.empty
 }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   * incremented. If the caller passes in completeDelayedRequests as 
false, the caller is expected to complete
+   * delayed requests for those returned partitions.
+   */
   private def appendForGroup(group: GroupMetadata,
  records: Map[TopicPartition, MemoryRecords],
- callback: Map[TopicPartition, PartitionResponse] 
=> Unit): Unit = {
+ callback: Map[TopicPartition, PartitionResponse] 
=> Unit,
+ completeDelayedRequests: Boolean): 
Map[TopicPartition, LeaderHWChange] = {
 // call replica manager to append the group message
 replicaManager.appendRecords(
   timeout = config.offsetCommitTimeoutMs.toLong,
   requiredAcks = config.offsetCommitRequiredAcks,
   internalTopicsAllowed = true,
   origin = AppendOrigin.Coordinator,
+  completeDelayedRequests = completeDelayedRequests,
   entriesPerPartition = records,
   delayedProduceLock = Some(group.lock),
   responseCallback = callback)
   }
 
   /**
* Store offsets by appending it to the replicated log and then inserting to 
cache
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   * incremented. If the caller passes in completeDelayedRequests as 
false, the caller is expected to complete
+   * delayed requests for those returned partitions.
*/
   def storeOffsets(group: GroupMetadata,
consumerId: String,
offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => 
Unit,
+   completeDelayedRequests: Boolean,

Review comment:
   nice caching. Most methods don't need this flag. Let me revert them :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10168) Rename public StreamsConfig variable

2020-06-15 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136292#comment-17136292
 ] 

Boyang Chen commented on KAFKA-10168:
-

[~sbellapu] There is already a WIP PR for this change 

> Rename public StreamsConfig variable
> 
>
> Key: KAFKA-10168
> URL: https://issues.apache.org/jira/browse/KAFKA-10168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>  Labels: kip
>
> All Kafka Streams configuration parameter are exposed via public variables 
> that all end with `_CONFIG` suffix. However, we added the variable of 
> `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
> `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.
> KIP-626: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-15 Thread Sean Guo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136283#comment-17136283
 ] 

Sean Guo commented on KAFKA-10134:
--

[~guozhang] 
Cooperative:
{noformat}
ConsumerConfig values: 
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [xxx,xxx,xxx,xxx,xxx,xxx]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 54
default.api.timeout.ms = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = xxx-consumer-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 180
max.poll.records = 10
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 3
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 3
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer
{noformat}

Eager:
{noformat}
ConsumerConfig values: 
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [xxx,xxx,xxx,xxx,xxx,xxx]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 54
default.api.timeout.ms = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = xxx-consumer-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 180
max.poll.records = 10
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 3
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.

[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-15 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r440578202



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks 
when completing delayed requests so it may

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-10170





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10170) ReplicaManager should be responsible for checking delayed operations after appending to the log.

2020-06-15 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10170:
--

 Summary: ReplicaManager should be responsible for checking delayed 
operations after appending to the log.
 Key: KAFKA-10170
 URL: https://issues.apache.org/jira/browse/KAFKA-10170
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This issue aims to refactor code to simplify the code of checking delayed 
operations. This issue is inspired by [~hachikuji] 
(https://github.com/apache/kafka/pull/8657#discussion_r426943627)

{quote}
Currently we have a somewhat convoluted model where ReplicaManager creates 
delayed operations, but we depend on lower level components like Partition to 
be aware of them and complete them. This breaks encapsulation.

Not something we should try to complete in this PR, but as an eventual goal, I 
think we can consider trying to factor delayed operations out of Partition so 
that they can be managed by ReplicaManager exclusively. If you assume that is 
the end state, then we could drop completeDelayedRequests and let 
ReplicaManager always be responsible for checking delayed operations after 
appending to the log.

Other than ReplicaManager, the only caller of this method is 
GroupMetadataManager which uses it during offset expiration. I think the only 
reason we do this is because we didn't want to waste purgatory space. I don't 
think that's a good enough reason to go outside the normal flow. It would be 
simpler to follow the same path. Potentially we could make the callback an 
Option so that we still have a way to avoid polluting the purgatory.
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-15 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-644521032


   > It will be helpful if you could preserve the commit history in future 
updates to the PR since that makes it easier to identify the delta changes.
   
   my bad :(
   
   I'll keep that in mind



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-15 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r440576071



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
   val producerId = 1000L
   val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-  groupCoordinator.groupManager.handleTxnCompletion(producerId,
-offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+  val isCommit = random.nextBoolean
+  try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+  catch {
+case e: IllegalStateException if isCommit
+  && e.getMessage.contains("though the offset commit record itself 
hasn't been appended to the log")=>

Review comment:
   > That seems a bug.
   
   The root cause (changed by this PR) is that the "txn initialization" and 
"txn append" are not executed within same lock.
   
   **The test story is shown below.**
   
   ```CommitTxnOffsetsOperation``` calls 
```GroupMetadata.prepareTxnOffsetCommit``` to add 
```CommitRecordMetadataAndOffset(None, offsetAndMetadata)``` to 
```pendingTransactionalOffsetCommits``` (this is the link you attached).
   
   ```GroupMetadata.completePendingTxnOffsetCommit``` called by 
```CompleteTxnOperation``` throws ```IllegalStateException``` if 
```CommitRecordMetadataAndOffset.appendedBatchOffset``` is ```None``` 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L664).
 
   
   **Why it does not cause error before?**
   
   ```CommitRecordMetadataAndOffset.appendedBatchOffset``` is updated by the 
callback ```putCacheCallback``` 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407).
 ```TestReplicManager``` always create ```delayedProduce``` do handle the 
```putCacheCallback``` 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala#L188).
 The condition to complete the ```delayedProduce``` is 
```completeAttempts.incrementAndGet() >= 3```. And the condition gets true when 
call both ```producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)``` and ```tryCompleteDelayedRequests()``` since the former 
calls ```tryComplete``` two times and another calls ```tryComplete``` once. It 
means ```putCacheCallback``` is always executed by 
```TestReplicManager.appendRecords``` and noted that 
```TestReplicManager.appendRecords``` is executed within a group lock 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L738)
 . In short, txn initialization 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L464)
 and txn append 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407)
 are executed with same group lock. Hence, the following execution order is 
impossible.
   
   1. txn initialization
   1. txn completion
   1. txn append
   
   However, this PR disable to complete delayed requests within group lock held 
by caller. The ```putCacheCallback``` which used to append txn needs to require 
group lock again.
 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10168) Rename public StreamsConfig variable

2020-06-15 Thread sats (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136266#comment-17136266
 ] 

sats commented on KAFKA-10168:
--

Hi [~mjsax], it looks low hanging for me to start contributing to this project, 
can you please assign to me ? or please help me some tickets which are easy for 
the starter. Thanks 

> Rename public StreamsConfig variable
> 
>
> Key: KAFKA-10168
> URL: https://issues.apache.org/jira/browse/KAFKA-10168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>  Labels: kip
>
> All Kafka Streams configuration parameter are exposed via public variables 
> that all end with `_CONFIG` suffix. However, we added the variable of 
> `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
> `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.
> KIP-626: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted

2020-06-15 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136255#comment-17136255
 ] 

Sophie Blee-Goldman commented on KAFKA-10169:
-

I haven't seen anything damning (or even particularly interesting) in the logs 
so far. The only consistent pattern was that it tended to occur after a 
#handleAssignment where the doomed task was both a previous and current active 
task (ie owned before and after the rebalance). But it might just be a 
coincidence, and/or completely unrelated. I'll keep a look out for new thread 
deaths

> KafkaException: Failing batch since transaction was aborted
> ---
>
> Key: KAFKA-10169
> URL: https://issues.apache.org/jira/browse/KAFKA-10169
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> We've seen the following exception in our eos-beta test application recently:
> {code:java}
> [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task 
> 1_2 due to: [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> Exception handler choose to FAIL the processing, no more records would be 
> sent. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> Caused by: org.apache.kafka.common.KafkaException: Failing batch since 
> transaction was aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  ... 3 more
> {code}
> Somewhat unclear if this is an issue with eos-beta specifically, or just eos 
> in general. But several threads have died over the course of a few days in 
> the eos-beta application, while none so far have died on the eos-alpha 
> application.
> It's also unclear (at least to me) whether this is definitely an issue in 
> Streams or possibly a bug in the producer (or even the broker, although that 
> seems unlikely)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishbellapu opened a new pull request #8877: KAFKA-9194: Missing documentation for replicaMaxWaitTimeMs config value

2020-06-15 Thread GitBox


satishbellapu opened a new pull request #8877:
URL: https://github.com/apache/kafka/pull/8877


   Looks it is a typo, the actual key supposed to be this 
#replicaFetchWaitMaxTimeMs(replica.fetch.wait.max.ms) instead of that the docs 
have this #replicaMaxWaitTimeMs
   
   ### Committer Checklist (excluded from commit message)
   - [*] Verify design and implementation 
   - [*] Verify test coverage and CI build status
   - [*] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-15 Thread GitBox


ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555918



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
 }
 }
 
-if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+try {
+if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+}
+for (final TaskId taskId : 
consumedOffsetsAndMetadataPerTask.keySet()) {
+final Task task = tasks.get(taskId);
+task.postCommit();
+}
+} catch (final RuntimeException e) {
+firstException.compareAndSet(null, e);

Review comment:
   I see. Then I think it makes sense to always attempt to write the 
checkpoint/call `postCommit` for a task that was successfully committed, 
regardless of whether something went wrong during `postCommit` with a different 
task 
   
   And I agree, we should not make assumptions about the current code not 
throwing, unless it's explicitly in the contract of the method that it will 
never throw (which is not the case for `postCommit`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-15 Thread GitBox


mjsax commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-644498795


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-15 Thread GitBox


ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555458



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks,
  "\tExisting standby tasks: {}",
  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
-final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-final Set tasksToRecycle = new HashSet<>();
-
 builder.addSubscribedTopicsFromAssignment(
 
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
 logPrefix
 );
 
-// first rectify all existing tasks
 final LinkedHashMap taskCloseExceptions = 
new LinkedHashMap<>();
 
-final Set tasksToClose = new HashSet<>();
-final Map> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-final Set additionalTasksForCommitting = new HashSet<>();
+final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
+final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+final LinkedList tasksToClose = new LinkedList<>();
+final Set tasksToRecycle = new HashSet<>();
 final Set dirtyTasks = new HashSet<>();
 
+// first rectify all existing tasks
 for (final Task task : tasks.values()) {
 if (activeTasks.containsKey(task.id()) && task.isActive()) {
 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-if (task.commitNeeded()) {
-additionalTasksForCommitting.add(task);
-}
 activeTasksToCreate.remove(task.id());
 } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
 standbyTasksToCreate.remove(task.id());
-// check for tasks that were owned previously but have changed 
active/standby status
 } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+// check for tasks that were owned previously but have changed 
active/standby status
 tasksToRecycle.add(task);
 } else {
-try {
-task.suspend();
-final Map 
committableOffsets = task.prepareCommit();
-
-tasksToClose.add(task);
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-}
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format(
-"Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-task.id());
-log.error(uncleanMessage, e);
-taskCloseExceptions.put(task.id(), e);
-// We've already recorded the exception (which is the 
point of clean).
-// Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-dirtyTasks.add(task);
-}
+tasksToClose.add(task);
 }
 }
 
-if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+for (final Task task : tasksToClose) {
 try {
-for (final Task task : additionalTasksForCommitting) {
-final Map 
committableOffsets = task.prepareCommit();
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
   `postCommit` will always write the checkpoint if the task is in 
SUSPENDED, which it should always be before being closed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-15 Thread GitBox


mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555142



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks,
  "\tExisting standby tasks: {}",
  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
-final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-final Set tasksToRecycle = new HashSet<>();
-
 builder.addSubscribedTopicsFromAssignment(
 
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
 logPrefix
 );
 
-// first rectify all existing tasks
 final LinkedHashMap taskCloseExceptions = 
new LinkedHashMap<>();
 
-final Set tasksToClose = new HashSet<>();
-final Map> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-final Set additionalTasksForCommitting = new HashSet<>();
+final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
+final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+final LinkedList tasksToClose = new LinkedList<>();
+final Set tasksToRecycle = new HashSet<>();
 final Set dirtyTasks = new HashSet<>();
 
+// first rectify all existing tasks
 for (final Task task : tasks.values()) {
 if (activeTasks.containsKey(task.id()) && task.isActive()) {
 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-if (task.commitNeeded()) {
-additionalTasksForCommitting.add(task);
-}
 activeTasksToCreate.remove(task.id());
 } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
 standbyTasksToCreate.remove(task.id());
-// check for tasks that were owned previously but have changed 
active/standby status
 } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+// check for tasks that were owned previously but have changed 
active/standby status
 tasksToRecycle.add(task);
 } else {
-try {
-task.suspend();
-final Map 
committableOffsets = task.prepareCommit();
-
-tasksToClose.add(task);
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-}
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format(
-"Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-task.id());
-log.error(uncleanMessage, e);
-taskCloseExceptions.put(task.id(), e);
-// We've already recorded the exception (which is the 
point of clean).
-// Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-dirtyTasks.add(task);
-}
+tasksToClose.add(task);
 }
 }
 
-if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+for (final Task task : tasksToClose) {
 try {
-for (final Task task : additionalTasksForCommitting) {
-final Map 
committableOffsets = task.prepareCommit();
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
   `postCommit` only writes a checkpoint for non-eos. Thus, we still need 
to write a checkpoint in `close()` for the eos-case (if just blindly for all 
cases as we do atm).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-15 Thread GitBox


mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440555142



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks,
  "\tExisting standby tasks: {}",
  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
-final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-final Set tasksToRecycle = new HashSet<>();
-
 builder.addSubscribedTopicsFromAssignment(
 
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
 logPrefix
 );
 
-// first rectify all existing tasks
 final LinkedHashMap taskCloseExceptions = 
new LinkedHashMap<>();
 
-final Set tasksToClose = new HashSet<>();
-final Map> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-final Set additionalTasksForCommitting = new HashSet<>();
+final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
+final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+final LinkedList tasksToClose = new LinkedList<>();
+final Set tasksToRecycle = new HashSet<>();
 final Set dirtyTasks = new HashSet<>();
 
+// first rectify all existing tasks
 for (final Task task : tasks.values()) {
 if (activeTasks.containsKey(task.id()) && task.isActive()) {
 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-if (task.commitNeeded()) {
-additionalTasksForCommitting.add(task);
-}
 activeTasksToCreate.remove(task.id());
 } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
 standbyTasksToCreate.remove(task.id());
-// check for tasks that were owned previously but have changed 
active/standby status
 } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+// check for tasks that were owned previously but have changed 
active/standby status
 tasksToRecycle.add(task);
 } else {
-try {
-task.suspend();
-final Map 
committableOffsets = task.prepareCommit();
-
-tasksToClose.add(task);
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-}
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format(
-"Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-task.id());
-log.error(uncleanMessage, e);
-taskCloseExceptions.put(task.id(), e);
-// We've already recorded the exception (which is the 
point of clean).
-// Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-dirtyTasks.add(task);
-}
+tasksToClose.add(task);
 }
 }
 
-if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+for (final Task task : tasksToClose) {
 try {
-for (final Task task : additionalTasksForCommitting) {
-final Map 
committableOffsets = task.prepareCommit();
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
   `postCommit` only write a checkpoint for non-eos. Thus, we still need to 
write a checkpoint for eos case in `close()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-15 Thread GitBox


mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440554249



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
 }
 }
 
-if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+try {
+if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+}
+for (final TaskId taskId : 
consumedOffsetsAndMetadataPerTask.keySet()) {
+final Task task = tasks.get(taskId);
+task.postCommit();
+}
+} catch (final RuntimeException e) {
+firstException.compareAndSet(null, e);

Review comment:
   I meant the later. And I agree that if `commit` fails, we should not 
call `postCommit()`.
   
   For failure in `postCommit`: we make assumptions about the current code what 
seems dangerous (ie, not future prove)? -- IMHO, if `postCommit` fails, we need 
to close the corresponding task dirty and either recreate it, or rebalance, but 
we should also continue to call `postCommit()` for all other tasks?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted

2020-06-15 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10169:
---

 Summary: KafkaException: Failing batch since transaction was 
aborted
 Key: KAFKA-10169
 URL: https://issues.apache.org/jira/browse/KAFKA-10169
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
 Fix For: 2.6.0


We've seen the following exception in our eos-beta test application recently:
{code:java}
[2020-06-13T00:09:14-07:00] 
(streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic 
stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task 
1_2 due to: [2020-06-13T00:09:14-07:00] 
(streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
org.apache.kafka.common.KafkaException: Failing batch since transaction was 
aborted [2020-06-13T00:09:14-07:00] 
(streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
Exception handler choose to FAIL the processing, no more records would be sent. 
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
 at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
 at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] 
(streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
Caused by: org.apache.kafka.common.KafkaException: Failing batch since 
transaction was aborted at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
 ... 3 more
{code}
Somewhat unclear if this is an issue with eos-beta specifically, or just eos in 
general. But several threads have died over the course of a few days in the 
eos-beta application, while none so far have died on the eos-alpha application.

It's also unclear (at least to me) whether this is definitely an issue in 
Streams or possibly a bug in the producer (or even the broker, although that 
seems unlikely)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10168) Rename public StreamsConfig variable

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10168:

Description: 
All Kafka Streams configuration parameter are exposed via public variables that 
all end with `_CONFIG` suffix. However, we added the variable of 
`topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
`TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.

KIP-626: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name]

  was:
All Kafka Streams configuration parameter are exposed via public variables that 
all end with `_CONFIG` suffix. However, we added the variable of 
`topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
`TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.

KIP-629: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Rename+StreamsConfig+config+variable+name]


> Rename public StreamsConfig variable
> 
>
> Key: KAFKA-10168
> URL: https://issues.apache.org/jira/browse/KAFKA-10168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>  Labels: kip
>
> All Kafka Streams configuration parameter are exposed via public variables 
> that all end with `_CONFIG` suffix. However, we added the variable of 
> `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
> `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.
> KIP-626: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10168) Rename public StreamsConfig variable

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10168:

Description: 
All Kafka Streams configuration parameter are exposed via public variables that 
all end with `_CONFIG` suffix. However, we added the variable of 
`topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
`TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.

KIP-629: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Rename+StreamsConfig+config+variable+name]

  was:All Kafka Streams configuration parameter are exposed via public 
variables that all end with `_CONFIG` suffix. However, we added the variable of 
`topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
`TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.


> Rename public StreamsConfig variable
> 
>
> Key: KAFKA-10168
> URL: https://issues.apache.org/jira/browse/KAFKA-10168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>  Labels: kip
>
> All Kafka Streams configuration parameter are exposed via public variables 
> that all end with `_CONFIG` suffix. However, we added the variable of 
> `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
> `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.
> KIP-629: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Rename+StreamsConfig+config+variable+name]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10168) Rename public StreamsConfig variable

2020-06-15 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-10168:
---

 Summary: Rename public StreamsConfig variable
 Key: KAFKA-10168
 URL: https://issues.apache.org/jira/browse/KAFKA-10168
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


All Kafka Streams configuration parameter are exposed via public variables that 
all end with `_CONFIG` suffix. However, we added the variable of 
`topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
`TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10168) Rename public StreamsConfig variable

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10168:
---

Assignee: Matthias J. Sax

> Rename public StreamsConfig variable
> 
>
> Key: KAFKA-10168
> URL: https://issues.apache.org/jira/browse/KAFKA-10168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>  Labels: kip
>
> All Kafka Streams configuration parameter are exposed via public variables 
> that all end with `_CONFIG` suffix. However, we added the variable of 
> `topology.optimization` as `TOPOLOGY_OPTIMIZATION` instead of 
> `TOPLOGY_OPTIMIZATION_CONFIG`. We should align the variable name.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8865: MINOR: fix StreamsConfig parameter name variable

2020-06-15 Thread GitBox


mjsax commented on pull request #8865:
URL: https://github.com/apache/kafka/pull/8865#issuecomment-644485931


   Not sure if changing KIP-295 would be a good call. We made a mistake and can 
be honest about it. I'll do a quick KIP.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8861: MINOR: clean up unused checkstyle suppressions for Streams

2020-06-15 Thread GitBox


mjsax commented on pull request #8861:
URL: https://github.com/apache/kafka/pull/8861#issuecomment-644484131


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8871: MINOR: code cleanup for inconsistent naming

2020-06-15 Thread GitBox


mjsax commented on pull request #8871:
URL: https://github.com/apache/kafka/pull/8871#issuecomment-644483903


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10062) Add a method to retrieve the current timestamp as known by the Streams app

2020-06-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136232#comment-17136232
 ] 

Matthias J. Sax commented on KAFKA-10062:
-

The testing is done using `TopologyTestDriver` (cf 
[https://kafka.apache.org/25/documentation/streams/developer-guide/testing.html])
 – the test driver mocks wall-clock time and allows users to manipulate 
wall-clock time to control when wall-clock-time-based punctuation should fire. 
However, the mocked wall-clock time is not exposed via the `context`.

Note, that internally, all code alway used `Time` interface if it needs access 
to system time. In a read deployment this will translate to system time, while 
the test driver switches the implementation to use `MockTime`.

Does this help?

> Add a method to retrieve the current timestamp as known by the Streams app
> --
>
> Key: KAFKA-10062
> URL: https://issues.apache.org/jira/browse/KAFKA-10062
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Piotr Smolinski
>Assignee: William Bottrell
>Priority: Major
>  Labels: needs-kip, newbie
>
> Please add to the ProcessorContext a method to retrieve current timestamp 
> compatible with Punctuator#punctate(long) method.
> Proposal in ProcessorContext:
> long getTimestamp(PunctuationType type);
> The method should return time value as known by the Punctuator scheduler with 
> the respective PunctuationType.
> The use-case is tracking of a process with timeout-based escalation.
> A transformer receives process events and in case of missing an event execute 
> an action (emit message) after given escalation timeout (several stages). The 
> initial message may already arrive with reference timestamp in the past and 
> may trigger different action upon arrival depending on how far in the past it 
> is.
> If the timeout should be computed against some further time only, Punctuator 
> is perfectly sufficient. The problem is that I have to evaluate the current 
> time-related state once the message arrives.
> I am using wall-clock time. Normally accessing System.currentTimeMillis() is 
> sufficient, but it breaks in unit testing with TopologyTestDriver, where the 
> app wall clock time is different from the system-wide one.
> To access the mentioned clock I am using reflection to access 
> ProcessorContextImpl#task and then StreamTask#time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-06-15 Thread GitBox


junrao commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r440539291



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -595,8 +595,10 @@ private[log] class Cleaner(val id: Int,
   log.replaceSegments(List(cleaned), segments)
 } catch {
   case e: LogCleaningAbortedException =>
-try cleaned.deleteIfExists()
-catch {
+try {
+  cleaned.deleteIfExists()
+  log.producerStateManager.deleteIfExists(cleaned.baseOffset)

Review comment:
   Hmm, the cleaned segment has the same base offset as the first segment. 
So, we don't want to delete that snapshot file.

##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -751,6 +751,25 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   None
   }
 
-  private def listSnapshotFiles: Seq[File] = 
ProducerStateManager.listSnapshotFiles(logDir)
+  private[log] def listSnapshotFiles: Seq[File] = 
ProducerStateManager.listSnapshotFiles(logDir)
 
+  /**
+   * Remove any producer state snapshot files which do not have a 
corresponding offset provided
+   * in keepOffsets. The latest snapshot file will always be kept.

Review comment:
   What's keepOffsets?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2421,6 +2396,7 @@ class Log(@volatile private var _dir: File,
 newSegments.foreach { splitSegment =>
   splitSegment.close()
   splitSegment.deleteIfExists()
+  producerStateManager.deleteIfExists(splitSegment.baseOffset)

Review comment:
   It doesn't seem that we generate producer snapshot files for those new 
segments?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2237,7 +2209,10 @@ class Log(@volatile private var _dir: File,
 def deleteSegments(): Unit = {
   info(s"Deleting segments ${segments.mkString(",")}")
   maybeHandleIOException(s"Error while deleting segments for 
$topicPartition in dir ${dir.getParent}") {
-segments.foreach(_.deleteIfExists())
+segments.foreach { segment =>
+  segment.deleteIfExists()
+  producerStateManager.deleteIfExists(segment.baseOffset)

Review comment:
   Hmm, this can be a bit tricky. When we replace old segments with a new 
segment in LogCleaner, each of the old segment will be deleted. However, the 
first old segment has the same offset as the new segment. So, we don't want to 
just delete the producer snapshot corresponding to the first old segment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-15 Thread GitBox


andrewchoi5 commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-644482079


   Thanks, @junrao .
   Removed the population of responseMap with the `ERROR` code.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10167:
---

Assignee: Guozhang Wang

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0
>
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #8876: KAFKA-10167: use the admin client to read end-offset

2020-06-15 Thread GitBox


mjsax commented on a change in pull request #8876:
URL: https://github.com/apache/kafka/pull/8876#discussion_r440535836



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -834,6 +855,13 @@ public AlterClientQuotasResult 
alterClientQuotas(Collection 
newOffsets) {
+beginningOffsets.putAll(newOffsets);
+}
+public synchronized void updateEndOffsets(final Map 
newOffsets) {

Review comment:
   nit: empty line.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -564,8 +576,15 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
 return Collections.emptyMap();
 
 try {
-return restoreConsumer.endOffsets(partitions);
-} catch (final TimeoutException e) {
+if (adminClient != null) {

Review comment:
   Why do we need this distinction? Seems we set `adminClient` in any case 
and it should never be `null`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -273,13 +273,13 @@ public boolean isRunning() {
 private volatile ThreadMetadata threadMetadata;
 private StreamThread.StateListener stateListener;
 
-private final Admin adminClient;
 private final ChangelogReader changelogReader;
 
 // package-private for testing
 final ConsumerRebalanceListener rebalanceListener;
 final Consumer mainConsumer;
 final Consumer restoreConsumer;
+final Admin adminClient;

Review comment:
   Why is `adminClient` not final any longer? We still pass it into the 
`StreamThread` constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-15 Thread GitBox


junrao commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-644476784


   @andrewchoi5 : Since the controller only checks KAFKA_STORAGE_ERROR in 
LeaderAndIsrResponse now, perhaps we can just log an error without sending an 
error code back for now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10167:

Priority: Blocker  (was: Major)

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Blocker
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10167:

Affects Version/s: 2.6.0

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0
>
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10167:

Fix Version/s: 2.6.0

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0
>
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10167:

Component/s: streams

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang opened a new pull request #8876: KAFKA-10167: use the admin client to read end-offset

2020-06-15 Thread GitBox


guozhangwang opened a new pull request #8876:
URL: https://github.com/apache/kafka/pull/8876


   Since admin client allows use to use flexible offset-spec, we can always set 
to use read-uncommitted regardless of the eos config.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136216#comment-17136216
 ] 

Guozhang Wang commented on KAFKA-10167:
---

I've also thought about whether we need to block on mainConsumer#committed 
before getting the end-offset, and now I think that is not necessary either 
since the end-offset only requires that data is flushed --- again, remember we 
have this single-writer single-reader scenario and when we are in the 
initialize-changelog-reader phase, we know that the other old producer would 
not be able to write any more unabortable data to that partition.

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136215#comment-17136215
 ] 

Guozhang Wang commented on KAFKA-10167:
---

The proposed solution is that even under EOS, do not try to use 
consumer.endOffset that would set `read-committed` flag, but to just use 
list-offset with `read-uncommitted` to get the end-offset.

The rationale is that, since we know that this changelog-topic is a 
single-writer, single-reader, and we control all the writer / reader of it, we 
can safely assume that the on-going txn is only from our previous writer. 

If the task migration is due to a graceful rebalance (i.e. the task is indeed 
being revoked from the other host), then the old host would always commit in 
which it would block on `producer.flush` to make sure all data are written 
(although by default we do not override replication factor on changelog topics 
and producer's ack.mode, so if user change the one without the other they may 
bump into other issues where data are not replicated completely and hence 
high-watermark returned from list-offset can be smaller). And therefore the 
end-offset returned would return the actual log-end-offset with or without the 
txn-marker, either of which is fine.

If the task migration is due to an unexpected task migration (i.e. the task was 
not proactively revoked, the old host may not know it is out of the group or 
has been crashed), then although not all records sent from the old host are 
guaranteed to be on the broker and be covered with end-offset, it is fine since 
these records will be aborted eventually anyways.

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-15 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10167:
-

 Summary: Streams EOS-Beta should not try to get end-offsets as 
read-committed
 Key: KAFKA-10167
 URL: https://issues.apache.org/jira/browse/KAFKA-10167
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


This is a bug discovered with the new EOS protocol (KIP-447), here's the 
context:

In Streams when we are assigned with the new active tasks, we would first try 
to restore the state from the changelog topic all the way to the log end 
offset, and then we can transit from the `restoring` to the `running` state to 
start processing the task.

Before KIP-447, the end-offset call is only triggered after we've passed the 
synchronization barrier at the txn-coordinator which would guarantee that the 
txn-marker has been sent and received (otherwise we would error with 
CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker is 
received, it also means that the marker has been fully replicated, which in 
turn guarantees that the data written before that marker has been fully 
replicated. As a result, when we send the list-offset with `read-committed` 
flag we are guaranteed that the returned offset == LSO == high-watermark.

After KIP-447 however, we do not fence on the txn-coordinator but on 
group-coordinator upon offset-fetch, and the group-coordinator would return the 
fetching offset right after it has received the replicated the txn-marker sent 
to it. However, since the txn-marker are sent to different brokers in parallel, 
and even within the same broker markers of different partitions are appended / 
replicated independently as well, so when the fetch-offset request returns it 
is NOT guaranteed that the LSO on other data partitions would have been 
advanced as well. And hence in that case the `endOffset` call may returned a 
smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-15 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r440514740



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
   val producerId = 1000L
   val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-  groupCoordinator.groupManager.handleTxnCompletion(producerId,
-offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+  val isCommit = random.nextBoolean
+  try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+  catch {
+case e: IllegalStateException if isCommit
+  && e.getMessage.contains("though the offset commit record itself 
hasn't been appended to the log")=>

Review comment:
   Thanks. I am still not sure that I fully understand this. It seems that 
by not completing the delayedProduce within the group lock, we are hitting 
IllegalStateException. That seems a bug. Do you know which code depends on 
that? It seems that we do hold a group lock when updating the txnOffset.
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L462





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-15 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r440514740



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##
@@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 override def runWithCallback(member: GroupMember, responseCallback: 
CompleteTxnCallback): Unit = {
   val producerId = 1000L
   val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
-  groupCoordinator.groupManager.handleTxnCompletion(producerId,
-offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
+  val isCommit = random.nextBoolean
+  try groupCoordinator.groupManager.handleTxnCompletion(producerId,
+offsetsPartitions.map(_.partition).toSet, isCommit = isCommit)
+  catch {
+case e: IllegalStateException if isCommit
+  && e.getMessage.contains("though the offset commit record itself 
hasn't been appended to the log")=>

Review comment:
   Thanks. I am still not sure that I fully understand this. It seems that 
by not completing the delayedProduce within the group lock, we are hitting 
IllegalStateException. Do you know which code depends on that? It seems that we 
do hold a group lock when updating the txnOffset.
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L462

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, 
requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks 
when completing delayed requests so it may

Review comment:
   Yes, we can refactor that in a separate PR. Could you file a followup 
jira for that?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-15 Thread GitBox


ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440473726



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() {
 /**
  * 
  * the following order must be followed:
- *  1. checkpoint the state manager -- even if we crash before this step, 
EOS is still guaranteed
+ *  1. commit/checkpoint the state manager -- even if we crash before this 
step, EOS is still guaranteed

Review comment:
   ack

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks,
  "\tExisting standby tasks: {}",
  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
-final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-final Set tasksToRecycle = new HashSet<>();
-
 builder.addSubscribedTopicsFromAssignment(
 
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
 logPrefix
 );
 
-// first rectify all existing tasks
 final LinkedHashMap taskCloseExceptions = 
new LinkedHashMap<>();
 
-final Set tasksToClose = new HashSet<>();
-final Map> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-final Set additionalTasksForCommitting = new HashSet<>();
+final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
+final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+final LinkedList tasksToClose = new LinkedList<>();
+final Set tasksToRecycle = new HashSet<>();
 final Set dirtyTasks = new HashSet<>();
 
+// first rectify all existing tasks
 for (final Task task : tasks.values()) {
 if (activeTasks.containsKey(task.id()) && task.isActive()) {
 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-if (task.commitNeeded()) {
-additionalTasksForCommitting.add(task);
-}
 activeTasksToCreate.remove(task.id());
 } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
 standbyTasksToCreate.remove(task.id());
-// check for tasks that were owned previously but have changed 
active/standby status
 } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+// check for tasks that were owned previously but have changed 
active/standby status
 tasksToRecycle.add(task);
 } else {
-try {
-task.suspend();
-final Map 
committableOffsets = task.prepareCommit();
-
-tasksToClose.add(task);
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-}
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format(
-"Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-task.id());
-log.error(uncleanMessage, e);
-taskCloseExceptions.put(task.id(), e);
-// We've already recorded the exception (which is the 
point of clean).
-// Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-dirtyTasks.add(task);
-}
+tasksToClose.add(task);
 }
 }
 
-if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+for (final Task task : tasksToClose) {
 try {
-for (final Task task : additionalTasksForCommitting) {
-final Map 
committableOffsets = task.prepareCommit();
-if (!committableOffsets.isEmpty()) {
-consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
   1. I think you're right, we don't need to keep track of the current 
`checkpoint` offsets at all and can just write the current 
`checkpointableOffsets` in `postCommit`
   2. done





This is an automated message from the Apache Git Service.
To respond to

[jira] [Commented] (KAFKA-10062) Add a method to retrieve the current timestamp as known by the Streams app

2020-06-15 Thread William Bottrell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136199#comment-17136199
 ] 

William Bottrell commented on KAFKA-10062:
--

Hi, [~psmolinski]. I'm looking into this request and I would appreciate your 
help understanding your use case better. Could you give me some more details on 
how you are currently doing your testing, so I could reproduce and see exactly 
what you mean? Thanks! Please forgive me for not understanding, I'm extremely 
new to Kafka.

> Add a method to retrieve the current timestamp as known by the Streams app
> --
>
> Key: KAFKA-10062
> URL: https://issues.apache.org/jira/browse/KAFKA-10062
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Piotr Smolinski
>Assignee: William Bottrell
>Priority: Major
>  Labels: needs-kip, newbie
>
> Please add to the ProcessorContext a method to retrieve current timestamp 
> compatible with Punctuator#punctate(long) method.
> Proposal in ProcessorContext:
> long getTimestamp(PunctuationType type);
> The method should return time value as known by the Punctuator scheduler with 
> the respective PunctuationType.
> The use-case is tracking of a process with timeout-based escalation.
> A transformer receives process events and in case of missing an event execute 
> an action (emit message) after given escalation timeout (several stages). The 
> initial message may already arrive with reference timestamp in the past and 
> may trigger different action upon arrival depending on how far in the past it 
> is.
> If the timeout should be computed against some further time only, Punctuator 
> is perfectly sufficient. The problem is that I have to evaluate the current 
> time-related state once the message arrives.
> I am using wall-clock time. Normally accessing System.currentTimeMillis() is 
> sufficient, but it breaks in unit testing with TopologyTestDriver, where the 
> app wall clock time is different from the system-wide one.
> To access the mentioned clock I am using reflection to access 
> ProcessorContextImpl#task and then StreamTask#time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

2020-06-15 Thread GitBox


michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-63698


   Failing unit test 
kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles is unrelated 
to this pull request and is tracked by: 
https://issues.apache.org/jira/browse/KAFKA-10155
   https://issues.apache.org/jira/browse/KAFKA-10147



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] benhannel edited a comment on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted

2020-06-15 Thread GitBox


benhannel edited a comment on pull request #4204:
URL: https://github.com/apache/kafka/pull/4204#issuecomment-644416082


   Perfect is the enemy of good.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] benhannel commented on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted

2020-06-15 Thread GitBox


benhannel commented on pull request #4204:
URL: https://github.com/apache/kafka/pull/4204#issuecomment-644416082


   Perfect is the enemy of good. This issue has been unresolved for years 
because of idealism.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman closed pull request #8868: Revert "KAFKA-9983: KIP-613: add INFO level e2e latency metrics (#8697)"

2020-06-15 Thread GitBox


ableegoldman closed pull request #8868:
URL: https://github.com/apache/kafka/pull/8868


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8839: KIP-585: Documentation

2020-06-15 Thread GitBox


rhauch commented on pull request #8839:
URL: https://github.com/apache/kafka/pull/8839#issuecomment-644382512


   @tombentley, yes we'll want to merge this and backport to the `2.6` branch. 
That branch is not yet frozen for documentation or tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomasrockhu closed pull request #8875: Update to run coverage on Jenkins

2020-06-15 Thread GitBox


thomasrockhu closed pull request #8875:
URL: https://github.com/apache/kafka/pull/8875


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager

2020-06-15 Thread GitBox


guozhangwang commented on pull request #8874:
URL: https://github.com/apache/kafka/pull/8874#issuecomment-644329675


   To 2.6 as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager

2020-06-15 Thread GitBox


guozhangwang merged pull request #8874:
URL: https://github.com/apache/kafka/pull/8874


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager

2020-06-15 Thread GitBox


guozhangwang commented on pull request #8874:
URL: https://github.com/apache/kafka/pull/8874#issuecomment-644329167


   Ran checkstyle and spotbug locally and looks good.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomasrockhu opened a new pull request #8875: Update to run coverage on Jenkins

2020-06-15 Thread GitBox


thomasrockhu opened a new pull request #8875:
URL: https://github.com/apache/kafka/pull/8875


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager

2020-06-15 Thread GitBox


mjsax commented on pull request #8874:
URL: https://github.com/apache/kafka/pull/8874#issuecomment-644328634


   Thanks! Was just about to do a hotfix PR myself.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in testing

2020-06-15 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10166:

Description: 
As the title indicates, long-running test applications with injected network 
"outages" seem to hit TaskCorruptedException more than expected.

Seen occasionally on the ALOS application (~20 times in two days in one case, 
for example), and very frequently with EOS (many times per day)

  was:As the title indicates. Seen occasionally with ALOS (~20 times in two 
days in one case, for example), and very frequently with EOS (many times per 
day)


> Excessive TaskCorruptedException seen in testing
> 
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates, long-running test applications with injected network 
> "outages" seem to hit TaskCorruptedException more than expected.
> Seen occasionally on the ALOS application (~20 times in two days in one case, 
> for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications

2020-06-15 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10166:

Description: As the title indicates. Seen occasionally with ALOS (~20 times 
in two days in one case, for example), and very frequently with EOS (many times 
per day)  (was: As the title indicates. Seen occasionally in the ALOS (~20 
times in two days on one soak), and very frequently in the EOS (many times per 
day))

> Excessive TaskCorruptedException seen in test applications
> --
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates. Seen occasionally with ALOS (~20 times in two days in 
> one case, for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications

2020-06-15 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10166:
---

Assignee: (was: Sophie Blee-Goldman)

> Excessive TaskCorruptedException seen in test applications
> --
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates. Seen occasionally with ALOS (~20 times in two days in 
> one case, for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in testing

2020-06-15 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10166:

Summary: Excessive TaskCorruptedException seen in testing  (was: Excessive 
TaskCorruptedException seen in test applications)

> Excessive TaskCorruptedException seen in testing
> 
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates. Seen occasionally with ALOS (~20 times in two days in 
> one case, for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications

2020-06-15 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10166:
---

Assignee: Sophie Blee-Goldman

> Excessive TaskCorruptedException seen in test applications
> --
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates. Seen occasionally with ALOS (~20 times in two days in 
> one case, for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10166) Excessive TaskCorruptedException seen in test applications

2020-06-15 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10166:

Summary: Excessive TaskCorruptedException seen in test applications  (was: 
Excessive TaskCorruptedException seen in soak)

> Excessive TaskCorruptedException seen in test applications
> --
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates. Seen occasionally in the ALOS (~20 times in two days 
> on one soak), and very frequently in the EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8855: DO NOT MERGE

2020-06-15 Thread GitBox


mjsax commented on pull request #8855:
URL: https://github.com/apache/kafka/pull/8855#issuecomment-644312790


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10166) Excessive TaskCorruptedException seen in soak

2020-06-15 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10166:
---

 Summary: Excessive TaskCorruptedException seen in soak
 Key: KAFKA-10166
 URL: https://issues.apache.org/jira/browse/KAFKA-10166
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
 Fix For: 2.6.0


As the title indicates. Seen occasionally in the ALOS (~20 times in two days on 
one soak), and very frequently in the EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager

2020-06-15 Thread GitBox


chia7712 commented on pull request #8874:
URL: https://github.com/apache/kafka/pull/8874#issuecomment-644293179


   @cadonna @guozhangwang Could you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #8874: HOTFIX: checkstyle error in ProcessorStateManager

2020-06-15 Thread GitBox


[jira] [Commented] (KAFKA-9681) Change whitelist/blacklist terms

2020-06-15 Thread sats (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136084#comment-17136084
 ] 

sats commented on KAFKA-9681:
-

This Requires a KIP. 

> Change whitelist/blacklist terms
> 
>
> Key: KAFKA-9681
> URL: https://issues.apache.org/jira/browse/KAFKA-9681
> Project: Kafka
>  Issue Type: Wish
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Gérald Quintana
>Priority: Major
>
> The whitelist/blacklist terms are not very inclusive, and can be perceived as 
> racist.
> Using allow/deny or include/exclude for example is more neutral



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604

2020-06-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136078#comment-17136078
 ] 

Matthias J. Sax commented on KAFKA-10104:
-

It's for 3.0 (or maybe even 4.0), as removing stuff is a breaking change. Not 
sure which release [~cmccabe] had in mind.

> Remove deprecated --zookeeper flags as specified in KIP-604
> ---
>
> Key: KAFKA-10104
> URL: https://issues.apache.org/jira/browse/KAFKA-10104
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>
> Remove deprecated --zookeeper flags as specified in KIP-604



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #8872: Fix log message for transition from standby to active

2020-06-15 Thread GitBox


guozhangwang merged pull request #8872:
URL: https://github.com/apache/kafka/pull/8872


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8872: Fix log message for transition from standby to active

2020-06-15 Thread GitBox


guozhangwang commented on pull request #8872:
URL: https://github.com/apache/kafka/pull/8872#issuecomment-644281735


   Cherry-picked to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8872: Fix log message for transition from standby to active

2020-06-15 Thread GitBox


guozhangwang commented on a change in pull request #8872:
URL: https://github.com/apache/kafka/pull/8872#discussion_r440348339



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -509,11 +509,12 @@ void transitionTaskType(final TaskType newType, final 
LogContext logContext) {
 throw new IllegalStateException("Tried to recycle state for task 
type conversion but new type was the same.");
 }
 
+TaskType oldType = taskType;
 taskType = newType;
 log = logContext.logger(ProcessorStateManager.class);
 logPrefix = logContext.logPrefix();
 
-log.debug("Transitioning state manager for {} task {} to {}", 
taskType, taskId, newType);
+log.debug("Transitioning state manager for {} task {} to {}", oldType, 
taskId, newType);

Review comment:
   I think using oldType/newType is fine here since then we do not need to 
keep in mind of the ordering of each call?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8873: Avoid WARN log message when re-init from checkpoint skipped

2020-06-15 Thread GitBox


guozhangwang merged pull request #8873:
URL: https://github.com/apache/kafka/pull/8873


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8873: Avoid WARN log message when re-init from checkpoint skipped

2020-06-15 Thread GitBox


guozhangwang commented on pull request #8873:
URL: https://github.com/apache/kafka/pull/8873#issuecomment-644280199


   Cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136069#comment-17136069
 ] 

Guozhang Wang commented on KAFKA-10134:
---

This is a bit weird to me -- discover of coordinator logic did not change from 
2.4 -> 2.5 AFAIK.

[~seanguo] could you list the configs of consumer when you used cooperative 
rebalance, v.s. eager rebalance?

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Priority: Major
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on pull request #8117: KAFKA-8403: Suppress needs a Materialized variant

2020-06-15 Thread GitBox


dongjinleekr commented on pull request #8117:
URL: https://github.com/apache/kafka/pull/8117#issuecomment-644277987


   @vvcephei Here is the fix. The implementation you saw was the draft before 
the discussion - Now it is updated, with rebasing onto the latest trunk. Don't 
hesitate to talk to me if you think we need additional tests like a queriable 
store.
   
   Thanks again for your detailed review! :smile:



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10165) Percentiles metric leaking memory

2020-06-15 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10165:
---

 Summary: Percentiles metric leaking memory
 Key: KAFKA-10165
 URL: https://issues.apache.org/jira/browse/KAFKA-10165
 Project: Kafka
  Issue Type: Bug
  Components: metrics, streams
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman
 Fix For: 2.6.0


We've hit several OOM in our soak cluster lately. We were finally able to get a 
heap dump right after the OOM, and found over 3.5 GB of memory being retained 
by the percentiles (or specifically by the 1MB float[] used by the 
percentiles). 

The leak does seem specific to the Percentiles class, as we see ~3000 instances 
of the Percentiles object vs only ~500 instances of the Max object, which is 
also used in the same sensor as the Percentiles

We did recently lower the size from 1MB to 100kB, but it's clear there is a 
leak of some kind and a "smaller leak" is not an acceptable solution. If the 
cause fo the leak is not immediately obvious we should just revert the 
percentiles in 2.6 and work on stabilizing them for 2.7



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories

2020-06-15 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-8362:

Component/s: jbod

> LogCleaner gets stuck after partition move between log directories
> --
>
> Key: KAFKA-8362
> URL: https://issues.apache.org/jira/browse/KAFKA-8362
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, log cleaner
>Reporter: Julio Ng
>Priority: Major
>
> When a partition is moved from one directory to another, their checkpoint 
> entry in cleaner-offset-checkpoint file is not removed from the source 
> directory.
> As a consequence when we read the last firstDirtyOffset, we might get a stale 
> value from the old checkpoint file.
> Basically, we need clean up the entry from the check point file in the source 
> directory when the move is completed
> The current issue is that the code in LogCleanerManager:
> {noformat}
> /**
>  * @return the position processed for all logs.
>  */
> def allCleanerCheckpoints: Map[TopicPartition, Long] = {
>   inLock(lock) {
> checkpoints.values.flatMap(checkpoint => {
>   try {
> checkpoint.read()
>   } catch {
> case e: KafkaStorageException =>
>   error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
> in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
>   Map.empty[TopicPartition, Long]
>   }
> }).toMap
>   }
> }{noformat}
> collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade

2020-06-15 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Description: 
After some discussion we believe there is room to improve the accuracy of this 
test, by enforcing the wait for a stable rebalance result after KIP-441. 

===

System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment (non-empty) before kicking 
off the record processing validation. 

  was:
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment (non-empty) before kicking 
off the record processing validation. 


> Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade 
> 
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: newbie
> Attachments: 166.tgz
>
>
> After some discussion we believe there is room to improve the accuracy of 
> this test, by enforcing the wait for a stable rebalance result after KIP-441. 
> ===
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment (non-empty) before 
> kicking off the record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade

2020-06-15 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Summary: Wait for stable assignment 
StreamsUpgradeTest.test_metadata_upgrade   (was: 
StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets 
assigned task)

> Wait for stable assignment StreamsUpgradeTest.test_metadata_upgrade 
> 
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: newbie
> Attachments: 166.tgz
>
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment (non-empty) before 
> kicking off the record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving

2020-06-15 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136012#comment-17136012
 ] 

Sophie Blee-Goldman commented on KAFKA-10105:
-

Did you upgrade the clients as well (or at all), or just the cluster? What 
version were the consumers on?

> Regression in group coordinator dealing with flaky clients joining while 
> leaving
> 
>
> Key: KAFKA-10105
> URL: https://issues.apache.org/jira/browse/KAFKA-10105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker
> Kafka 2.4.1 on jre 11 on debian 9 in docker
>Reporter: William Reynolds
>Priority: Major
>
> Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals 
> correctly with a consumer sending a join after a leave correctly.
> What happens no is that if a consumer sends a leaving then follows up by 
> trying to send a join again as it is shutting down the group coordinator adds 
> the leaving member to the group but never seems to heartbeat that member.
> Since the consumer is then gone when it joins again after starting it is 
> added as a new member but the zombie member is there and is included in the 
> partition assignment which means that those partitions never get consumed 
> from. What can also happen is that one of the zombies gets group leader so 
> rebalance gets stuck forever and the group is entirely blocked.
> I have not been able to track down where this got introduced between 1.1.0 
> and 2.4.1 but I will look further into this. Unfortunately the logs are 
> essentially silent about the zombie mebers and I only had INFO level logging 
> on during the issue and by stopping all the consumers in the group and 
> restarting the broker coordinating that group we could get back to a working 
> state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts

2020-06-15 Thread GitBox


guozhangwang commented on pull request #8646:
URL: https://github.com/apache/kafka/pull/8646#issuecomment-644247677


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8872: Fix log message for transition from standby to active

2020-06-15 Thread GitBox


ableegoldman commented on a change in pull request #8872:
URL: https://github.com/apache/kafka/pull/8872#discussion_r440299033



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -509,11 +509,12 @@ void transitionTaskType(final TaskType newType, final 
LogContext logContext) {
 throw new IllegalStateException("Tried to recycle state for task 
type conversion but new type was the same.");
 }
 
+TaskType oldType = taskType;
 taskType = newType;
 log = logContext.logger(ProcessorStateManager.class);
 logPrefix = logContext.logPrefix();
 
-log.debug("Transitioning state manager for {} task {} to {}", 
taskType, taskId, newType);
+log.debug("Transitioning state manager for {} task {} to {}", oldType, 
taskId, newType);

Review comment:
   I agree it would be useful to prefix the logs with `standby` or 
`active`, but I'd prefer to do that everywhere in a separate PR.
   
   Can we just move the log message to before we reassign `taskType = newType`? 
Or do you think we might forget/not notice that the ordering is relevant and 
accidentally move it back at some future time?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8865: MINOR: fix StreamsConfig parameter name variable

2020-06-15 Thread GitBox


vvcephei commented on pull request #8865:
URL: https://github.com/apache/kafka/pull/8865#issuecomment-644211672


   Looks like a compile warning:
   
   ```
   00:07:57.788 > Task :streams:compileJava
   00:07:57.788 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:530:
 warning: [dep-ann] deprecated item is not annotated with @Deprecated
   00:07:57.788 public static final String TOPOLOGY_OPTIMIZATION = 
"topology.optimization";
   00:07:57.789^
   00:07:57.789 error: warnings found and -Werror specified
   ```
   
   I'm +1 on this change, and I think it would be fine to just amend KIP-295.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-06-15 Thread GitBox


tombentley commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-644211634


   @dajac the problem happens when `--command-config` is given on the command 
line, and the config file defines a `client.id`. The `AppInfoParser` tries to 
register an MBean whose name is the same as the name used by the existing admin 
client instance, and the platform MBean server throws the exception given in 
the issue. Sorry, the initial description in 
[KAFKA-10109](https://issues.apache.org/jira/browse/KAFKA-10109) omitted this 
important detail (now fixed).
   
   Your point about false positives when verifying the output is correct, 
that's because it's currently not verifying the output, only the logging. To 
catch this error by verifying the standard output of the command alone would 
require changing the `log4j.properties` for all tests in core, which I felt was 
probably not warranted for this bug alone: I assume they've been set at `ERROR` 
for a more important reason. Or are you suggesting we use both 
`grabConsoleOutput()` and `LogCaptureAppender`?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10109) kafka-acls.sh/AclCommand opens multiple AdminClients

2020-06-15 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley updated KAFKA-10109:

Description: 
{{AclCommand.AclCommandService}} uses {{withAdminClient(opts: 
AclCommandOptions)(f: Admin => Unit)}} to abstract the execution of an action 
using an {{AdminClient}} instance. Unfortunately the use of this method in 
implemeting {{addAcls()}} and {{removeAcls()}} calls {{listAcls()}}. This 
causes the creation of a second {{AdminClient}} instance. When the 
{{--command-config}} option has been used to specify a {{client.id}} for the 
Admin client, the second instance  fails to register an MBean, resulting in a 
warning being logged.

{code}
./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config 
config/broker_connection.conf.reproducing --add --allow-principal User:alice 
--operation Describe --topic 'test' --resource-pattern-type prefixed
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, 
patternType=PREFIXED)`: 
(principal=User:alice, host=*, operation=DESCRIBE, 
permissionType=ALLOW) 

[2020-06-03 18:43:12,190] WARN Error registering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser)
javax.management.InstanceAlreadyExistsException: 
kafka.admin.client:type=app-info,id=administrator_data
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
at 
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
at 
kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:105)
at 
kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:146)
at 
kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1(AclCommand.scala:123)
at 
kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1$adapted(AclCommand.scala:116)
at 
kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:108)
at 
kafka.admin.AclCommand$AdminClientService.addAcls(AclCommand.scala:116)
at kafka.admin.AclCommand$.main(AclCommand.scala:78)
at kafka.admin.AclCommand.main(AclCommand.scala)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, 
patternType=PREFIXED)`: 
(principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW)
{code}

  was:
{{AclCommand.AclCommandService}} uses {{withAdminClient(opts: 
AclCommandOptions)(f: Admin => Unit)}} to abstract the execution of an action 
using an {{AdminClient}} instance. Unfortunately the use of this method in 
implemeting {{addAcls()}} and {{removeAcls()}} calls {{listAcls()}}. This 
causes the creation of a second {{AdminClient}} instance which then fails to 
register an MBean, resulting in a warning being logged.

{code}
./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config 
config/broker_connection.conf.reproducing --add --allow-principal User:alice 
--operation Describe --topic 'test' --resource-pattern-type prefixed
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, 
patternType=PREFIXED)`: 
(principal=User:alice, host=*, operation=DESCRIBE, 
permissionType=ALLOW) 

[2020-06-03 18:43:12,190] WARN Error registering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser)
javax.management.InstanceAlreadyExistsException: 
kafka.admin.client:type=app-info,id=administrator_data
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at 
com.sun.jmx.mbe

[GitHub] [kafka] dajac commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-06-15 Thread GitBox


dajac commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-644192842


   @tombentley I am not sure to follow what the `AppInfoParser` is doing 
here... Could you elaborate a bit more?
   
   Regarding option 2), I had a quick look at the implementation and it seems 
that it verifies that no errors are printed out but it does not really verify 
that the output is correct. For instance, if we would remove the 
`listAcls(adminClient)` at L115, we would not catch it, isn't it?
   
   Perhaps, we could add a small test for the `listAcls()` method which adds 
few ACLs, graps the output with `TestUtils.grabConsoleOutput` and verifies it. 
I think that it is probably enough given the scope of the change.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8839: KIP-585: Documentation

2020-06-15 Thread GitBox


tombentley commented on pull request #8839:
URL: https://github.com/apache/kafka/pull/8839#issuecomment-644176096


   @kkonstantine @rhauch any chance this could be merged for 2.6, or is it too 
late now?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest

2020-06-15 Thread GitBox


bbejeck commented on pull request #6229:
URL: https://github.com/apache/kafka/pull/6229#issuecomment-644170697


   System test failed with `Failed while trying to discover tests: Didn't find 
any tests for symbol 
tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py`.
   
   @sh-abhi can you take a look? I'll look into this soon as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #8646: KAFKA-9974: Fix flaky test by removing unneeded asserts

2020-06-15 Thread GitBox


showuon edited a comment on pull request #8646:
URL: https://github.com/apache/kafka/pull/8646#issuecomment-644030596







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8831: KAFKA-8657:Client-side automatic topic creation on Producer

2020-06-15 Thread GitBox


abbccdda commented on pull request #8831:
URL: https://github.com/apache/kafka/pull/8831#issuecomment-644109922


   @jiameixie Let's continue the discussion on mailing thread to get KIP-487 
approved first.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-06-15 Thread GitBox


dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r440028429



##
File path: clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
##
@@ -572,16 +572,18 @@ public synchronized void removeReporter(MetricsReporter 
reporter) {
 }
 }
 
-synchronized void registerMetric(KafkaMetric metric) {
+synchronized void registerMetric(KafkaMetric metric, boolean report) {

Review comment:
   Thinking a bit more about this, did you consider adding the flag to 
`MetricConfig`? It may be a bit simpler and cleaner as it avoids having to add 
the flag to all the methods. What do you think?

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
 acceptorBlockedPercentMeter: 
com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  if (!connectionSlotAvailable(listenerName)) {
+  val startTimeMs = time.milliseconds()
+  val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+  if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
 val startNs = time.nanoseconds
+val endThrottleTimeMs = startTimeMs + throttleTimeMs
+var remainingThrottleTimeMs = throttleTimeMs
 do {
-  counts.wait()
-} while (!connectionSlotAvailable(listenerName))
+  counts.wait(remainingThrottleTimeMs)

Review comment:
   Thanks for the clarification. I am sorry but I misread the code the 
first time.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
 
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+val listenerThrottleTimeMs = maxConnectionsPerListener
+  .get(listenerName)
+  .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+  .getOrElse(0)
+
+if (protectedListener(listenerName)) {
+  listenerThrottleTimeMs
+} else {
+  val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+  math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+}
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+try {
+  sensor.record(1.0, timeMs)
+  0
+} catch {
+  case e: QuotaViolationException =>
+val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+  e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+throttleTimeMs
+}
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+val quotaEntity = listenerOpt.getOrElse("broker")
+val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+metric.config(rateQuotaMetricConfig(quotaLimit))
+info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation 
rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): 
MetricName = {
+val quotaEntity = listenerOpt.getOrElse("broker")
+metrics.metricName(
+  s"connection-creation-rate-$quotaEntity",
+  "connection-quota-no-jmx",
+  s"Tracking $quota

[GitHub] [kafka] cadonna commented on pull request #8873: Avoid WARN log message when re-init from checkpoint skipped

2020-06-15 Thread GitBox


cadonna commented on pull request #8873:
URL: https://github.com/apache/kafka/pull/8873#issuecomment-644109233


   Call for review: @ableegoldman @guozhangwang @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #8873: Avoid WARN log message when re-init from checkpoint skipped

2020-06-15 Thread GitBox


cadonna opened a new pull request #8873:
URL: https://github.com/apache/kafka/pull/8873


   When a re-initialisation from checkpoint is skipped the following log 
messages appear in the logs. 
   
   ```
   DEBUG stream-thread 
[EosTest-a2c3b21b-7af1-4dce-a3e0-6dc10932e5a2-StreamThread-1] task [0_2] 
Skipping re-initialization of offset from checkpoint for recycled store 
KSTREAM-AGGREGATE-STATE-STORE-03 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)
   DEBUG stream-thread 
[EosTest-a2c3b21b-7af1-4dce-a3e0-6dc10932e5a2-StreamThread-1] task [0_2] 
Skipping re-initialization of offset from checkpoint for recycled store 
KSTREAM-AGGREGATE-STATE-STORE-07 
   WARN stream-thread 
[EosTest-a2c3b21b-7af1-4dce-a3e0-6dc10932e5a2-StreamThread-1] task [0_2] Some 
loaded checkpoint offsets cannot find their corresponding state stores: 
{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-2=1491, 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-07-changelog-2=1491} 
   ```
   
   The warning appears because the skipped offsets are not removed from the 
checkpoint. However, there is nothing to warn about, because the offset found 
there corresponding state stores and they were skipped. 

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >