[jira] [Updated] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource

2020-06-11 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10147:
---
Fix Version/s: (was: 2.7.0)
   2.6.0

> MockAdminClient#describeConfigs(Collection) is unable to 
> handle broker resource
> ---
>
> Key: KAFKA-10147
> URL: https://issues.apache.org/jira/browse/KAFKA-10147
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 2.6.0
>
>
> MockAdminClient#describeConfigs(Collection) has new 
> implementation introduced by 
> https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06.
>  It does not handle broker resource so 
> ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource

2020-06-11 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10147:
---
Fix Version/s: 2.7.0

> MockAdminClient#describeConfigs(Collection) is unable to 
> handle broker resource
> ---
>
> Key: KAFKA-10147
> URL: https://issues.apache.org/jira/browse/KAFKA-10147
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 2.7.0
>
>
> MockAdminClient#describeConfigs(Collection) has new 
> implementation introduced by 
> https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06.
>  It does not handle broker resource so 
> ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #8839: KIP-585: Documentation

2020-06-11 Thread GitBox


kkonstantine commented on a change in pull request #8839:
URL: https://github.com/apache/kafka/pull/8839#discussion_r439227501



##
File path: docs/connect.html
##
@@ -180,13 +182,80 @@ TransformationsSetSchemaMetadata - modify the schema name or version
 TimestampRouter - Modify the topic of a record based on original 
topic and timestamp. Useful when using a sink that needs to write to different 
tables or indexes based on timestamps
 RegexRouter - modify the topic of a record based on original 
topic, replacement string and a regular expression
+Filter - Removes messages from all further processing. This is 
used with a predicate to selectively filter 
certain messages.
 
 
 Details on how to configure each transformation are listed below:
 
 
 
 
+
+Predicates
+
+Transformations can be configured with prediates so that the 
transformation is applied only to messages which satisfy some condition. In 
particular, when combined with the Filter transformation predicates can 
be used to selectively filter out certain messages.

Review comment:
   ```suggestion
   Transformations can be configured with predicates so that the 
transformation is applied only to messages which satisfy some condition. In 
particular, when combined with the Filter transformation predicates can 
be used to selectively filter out certain messages.
   ```

##
File path: docs/connect.html
##
@@ -180,13 +182,80 @@ TransformationsSetSchemaMetadata - modify the schema name or version
 TimestampRouter - Modify the topic of a record based on original 
topic and timestamp. Useful when using a sink that needs to write to different 
tables or indexes based on timestamps
 RegexRouter - modify the topic of a record based on original 
topic, replacement string and a regular expression
+Filter - Removes messages from all further processing. This is 
used with a predicate to selectively filter 
certain messages.
 
 
 Details on how to configure each transformation are listed below:
 
 
 
 
+
+Predicates
+
+Transformations can be configured with prediates so that the 
transformation is applied only to messages which satisfy some condition. In 
particular, when combined with the Filter transformation predicates can 
be used to selectively filter out certain messages.
+
+Predicates are specified in the connector configuration.
+
+
+predicates - Set of aliases for the predicates to be 
applied to some of the transformations.
+predicates.$alias.type - Fully qualified class name 
for the predicate.
+predicates.$alias.$predicateSpecificConfig - 
Configuration properties for the predicate.
+
+
+All transformations have the implicit config properties 
predicate and negate. A predicular predicate is 
associated with a transformation by setting the transformation's 
predicate config to the predicate's alias. The predicate's value 
can be reversed using the negate configuration property.
+
+For example, suppose you have a source connector which produces 
messages to many different topics and you want to:
+
+filter out the messages in the 'foo' topic entirely
+apply the ExtractField transformation with the field name 
'other_field' to records in all topics except the topic 'bar'
+
+
+To do this we need to first to filter out the records destined for the 
topic 'foo'. The Filter transformation removes records from further processing, 
and can use the TopicNameMatches predicate to apply the transformation only to 
records in topics which match a certain regular expression. TopicNameMatches's 
only configuration property is pattern which is a Java regular 
expression for matching against the topic name. The configuration would look 
like this:
+
+
+transforms=Filter
+transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
+transforms.Filter.predicate=IsFoo
+
+predicates=IsFoo
+
predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
+predicates.IsFoo.pattern=foo
+
+
+Next we need to apply ExtractField only when the topic name of the 
record is not 'bar'. We can't just use TopicNameMatches directly, because that 
would apply the transformation to matching topic names, not topic names which 
do not match. The transformation's implicit negate config 
properties allows us to invert the set of records which a predicate matches. 
Adding the configuration for this to the previous example we arrive at:
+
+
+transforms=Filter,Extract
+transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
+transforms.Filter.predicate=IsFoo
+
+
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
+transforms.Extract.field=other_field
+transforms.Extract.predicate=IsBar
+transforms.Extract.negate=true

[GitHub] [kafka] ijuma commented on pull request #8857: KAFKA-10157: Fix broken tests due to InterruptedException from FinalizedFeatureChangeListener

2020-06-11 Thread GitBox


ijuma commented on pull request #8857:
URL: https://github.com/apache/kafka/pull/8857#issuecomment-643081702


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #8857: KAFKA-10157: Fix broken tests due to InterruptedException from FinalizedFeatureChangeListener

2020-06-11 Thread GitBox


kowshik commented on pull request #8857:
URL: https://github.com/apache/kafka/pull/8857#issuecomment-643080803


   cc @ijuma @apovzner @junrao @abbccdda for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik opened a new pull request #8857: KAFKA-10157: Fix broken tests due to FinalizedFeatureChangeListener i…

2020-06-11 Thread GitBox


kowshik opened a new pull request #8857:
URL: https://github.com/apache/kafka/pull/8857


   The call to `ChangeNotificationProcessorThread.queue.take()` could throw an 
`InterruptedException`.  While the queue is empty and the thread is blocking on 
taking an item from the queue, a concurrent call to 
`FinalizedFeatureChangeListener.close()` could interrupt the thread and cause 
an `InterruptedException` to be raised from `queue.take()`. In such a case, it 
is safe to ignore the exception since the thread is being shutdown.
   
   Definitely ignoring the `InterruptedException` for the above reason was the 
intent of the code that used the `ignoring` clause for the same. But it seems 
unfortunately the `ignoring` clause does not ignore `InterruptedException`, so 
that doesn't work for us. To confirm this theory, I found the following code in 
`scala.util.control.Exception.scala`: 
https://github.com/scala/scala/blob/v2.12.0/src/library/scala/util/control/Exception.scala#L167-L176.
   
   The fix in this PR is to just not use the `ignoring` clause.
   
   **Test plan:**
   Ran the unit and integration tests and found that the test failures are gone 
now.
   I will wait for CI to pass before merging this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-11 Thread Sean Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Guo updated KAFKA-10134:
-
Description: 
We want to utilize the new rebalance protocol to mitigate the stop-the-world 
effect during the rebalance as our tasks are long running task.

But after the upgrade when we try to kill an instance to let rebalance happen 
when there is some load(some are long running tasks >30S) there, the CPU will 
go sky-high. It reads ~700% in our metrics so there should be several threads 
are in a tight loop. We have several consumer threads consuming from different 
partitions during the rebalance. This is reproducible in both the new 
CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
difference is that with old eager rebalance rebalance protocol used the high 
CPU usage will dropped after the rebalance done. But when using cooperative 
one, it seems the consumers threads are stuck on something and couldn't finish 
the rebalance so the high CPU usage won't drop until we stopped our load. Also 
a small load without long running task also won't cause continuous high CPU 
usage as the rebalance can finish in that case.

 

"executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
[0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at

 

By debugging into the code we found it looks like the clients are  in a loop on 
finding the coordinator.

I also tried the old rebalance protocol for the new version the issue still 
exists but the CPU will be back to normal when the rebalance is done.

Also tried the same on the 2.4.1 which seems don't have this issue. So it seems 
related something changed between 2.4.1 and 2.5.0.

 

  was:
We want to utilize the new rebalance protocol to mitigate the stop-the-world 
effect during the rebalance as our tasks are long running task.

But after the upgrade when we try to kill an instance when there is some 
load(long running tasks >30S) there, the CPU will go sky-high. It reads ~700% 
in our metrics so it should several threads are in a tight loop.

 

"executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
[0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at

 

By debugging into the code we found it looks like the clients are  in a loop on 
finding the coordinator.

I also tried the old rebalance protocol for the new version the issue still 
exists but the CPU will be back to normal when the rebalance is done.

Also tried the same on the 2.4.1 which seems don't have this issue. So it seems 
related something changed between 2.4.1 and 2.5.0.

 


> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Priority: Major
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old ea

[jira] [Created] (KAFKA-10157) Multiple tests failed due to "Failed to process feature ZK node change event"

2020-06-11 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-10157:


 Summary: Multiple tests failed due to "Failed to process feature 
ZK node change event"
 Key: KAFKA-10157
 URL: https://issues.apache.org/jira/browse/KAFKA-10157
 Project: Kafka
  Issue Type: Bug
Reporter: Anna Povzner


Multiple tests failed due to "Failed to process feature ZK node change event". 
Looks like a result of merge of this PR: 
[https://github.com/apache/kafka/pull/8680]

Note that running tests without `--info` gives output like this one: 
{quote}Process 'Gradle Test Executor 36' finished with non-zero exit value 1
{quote}
kafka.network.DynamicConnectionQuotaTest failed:
{quote}
kafka.network.DynamicConnectionQuotaTest > testDynamicConnectionQuota 
STANDARD_OUT
 [2020-06-11 20:52:42,596] ERROR [feature-zk-node-event-process-thread]: Failed 
to process feature ZK node change event. The broker will eventually exit. 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76)
 java.lang.InterruptedException
 at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)
 at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090)
 at 
java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)
 at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147){quote}
 

kafka.api.CustomQuotaCallbackTest failed:

{quote}    [2020-06-11 21:07:36,745] ERROR 
[feature-zk-node-event-process-thread]: Failed to process feature ZK node 
change event. The broker will eventually exit. 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76)

    java.lang.InterruptedException

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090)

        at 
java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147)

        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)

        at scala.util.control.Exception$Catch.apply(Exception.scala:227)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
 at scala.util.control.Exception$Catch.apply(Exception.scala:227)
 at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{quote}
 

kafka.server.DynamicBrokerReconfigurationTest failed:

{quote}    [2020-06-11 21:13:01,207] ERROR 
[feature-zk-node-event-process-thread]: Failed to process feature ZK node 
change event. The broker will eventually exit. 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76)

    java.lang.InterruptedException

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090)

        at 
java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147)

        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)

        at scala.util.control.Exception$Catch.apply(Exception.scala:227)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{quote}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles

2020-06-11 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10155:
---

Assignee: Chia-Ping Tsai

> Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles
> ---
>
> Key: KAFKA-10155
> URL: https://issues.apache.org/jira/browse/KAFKA-10155
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Sophie Blee-Goldman
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Seems to fail more often than not. I've seen this fail on almost every single 
> build of the latest PRs
>  
> h3. Stacktrace
> java.lang.NullPointerException at 
> kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577)
>  at 
> kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.lang.Thread.run(Thread.java:830)
> h3. Standard Output
> Current partition replica assignment 
> \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
>  Proposed partition reassignment configuration 
> \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]}
>  Current partition replica assignment 
> \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
>  Proposed partition reassignment configuration 
> \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles

2020-06-11 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133891#comment-17133891
 ] 

Sophie Blee-Goldman commented on KAFKA-10155:
-

Oh cool, thanks for the heads up. I'll assign this ticket to you then

> Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles
> ---
>
> Key: KAFKA-10155
> URL: https://issues.apache.org/jira/browse/KAFKA-10155
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Seems to fail more often than not. I've seen this fail on almost every single 
> build of the latest PRs
>  
> h3. Stacktrace
> java.lang.NullPointerException at 
> kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577)
>  at 
> kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.lang.Thread.run(Thread.java:830)
> h3. Standard Output
> Current partition replica assignment 
> \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
>  Proposed partition reassignment configuration 
> \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]}
>  Current partition replica assignment 
> \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
>  Proposed partition reassignment configuration 
> \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles

2020-06-11 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133856#comment-17133856
 ] 

Chia-Ping Tsai commented on KAFKA-10155:


This issue will be resolved by KAFKA-10147

> Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles
> ---
>
> Key: KAFKA-10155
> URL: https://issues.apache.org/jira/browse/KAFKA-10155
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Seems to fail more often than not. I've seen this fail on almost every single 
> build of the latest PRs
>  
> h3. Stacktrace
> java.lang.NullPointerException at 
> kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577)
>  at 
> kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.lang.Thread.run(Thread.java:830)
> h3. Standard Output
> Current partition replica assignment 
> \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
>  Proposed partition reassignment configuration 
> \{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]}
>  Current partition replica assignment 
> \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
>  Proposed partition reassignment configuration 
> \{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

2020-06-11 Thread GitBox


mumrah commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-643035118


   The test failures seem to be flaky (KAFKA-10155)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection

2020-06-11 Thread GitBox


chia7712 commented on a change in pull request #8853:
URL: https://github.com/apache/kafka/pull/8853#discussion_r439170177



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -560,10 +529,14 @@ synchronized private Config 
getResourceDescription(ConfigResource resource) {
 }
 case TOPIC: {
 TopicMetadata topicMetadata = allTopics.get(resource.name());
-if (topicMetadata == null) {
-throw new UnknownTopicOrPartitionException();
+if (topicMetadata != null && !topicMetadata.markedForDeletion) 
{
+if (topicMetadata.fetchesRemainingUntilVisible > 0) 
topicMetadata.fetchesRemainingUntilVisible--;

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection

2020-06-11 Thread GitBox


chia7712 commented on a change in pull request #8853:
URL: https://github.com/apache/kafka/pull/8853#discussion_r439167614



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -365,51 +366,6 @@ synchronized public DescribeTopicsResult 
describeTopics(Collection topic
 return new DescribeTopicsResult(topicDescriptions);
 }
 
-@Override
-public DescribeConfigsResult describeConfigs(Collection 
resources) {

Review comment:
   this method has default implementation (see 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java#L340)
 which calls the variety ```describeConfigs(Collection, 
DescribeConfigsOptions)``` so it is ok to remove the new implementation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-11 Thread GitBox


ableegoldman commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643022606


   Also failed with unrelated (extremely) flaky 
   `ReassignPartitionsUnitTest.testModifyBrokerThrottles`
   and
`SslSelectorTest.testCloseOldestConnection` (tickets created)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10156) Flaky Test SslSelectorTest.testCloseOldestConnection

2020-06-11 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10156:
---

 Summary: Flaky Test SslSelectorTest.testCloseOldestConnection
 Key: KAFKA-10156
 URL: https://issues.apache.org/jira/browse/KAFKA-10156
 Project: Kafka
  Issue Type: Bug
  Components: network
Reporter: Sophie Blee-Goldman


Failed twice on the same build with
h3. Stacktrace

java.lang.AssertionError: The idle connection should have been closed at 
org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.assertTrue(Assert.java:42) at 
org.apache.kafka.common.network.SelectorTest.testCloseOldestConnection(SelectorTest.java:466)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.lang.Thread.run(Thread.java:834)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-11 Thread GitBox


ableegoldman commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-643022251


   Java 11 and 8 failed with (extremely) flaky 
`ReassignPartitionsUnitTest.testModifyBrokerThrottles` -- 
https://issues.apache.org/jira/browse/KAFKA-10155
   
   Java 14 failed with `SslSelectorTest.testCloseOldestConnection`-- 
https://issues.apache.org/jira/browse/KAFKA-10156



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource

2020-06-11 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133836#comment-17133836
 ] 

Chia-Ping Tsai commented on KAFKA-10147:


give blocked as it breaks test 
(ReassignPartitionsUnitTest#testModifyBrokerThrottles)

> MockAdminClient#describeConfigs(Collection) is unable to 
> handle broker resource
> ---
>
> Key: KAFKA-10147
> URL: https://issues.apache.org/jira/browse/KAFKA-10147
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
>
> MockAdminClient#describeConfigs(Collection) has new 
> implementation introduced by 
> https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06.
>  It does not handle broker resource so 
> ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10155) Flaky Test ReassignPartitionsUnitTest.testModifyBrokerThrottles

2020-06-11 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10155:
---

 Summary: Flaky Test 
ReassignPartitionsUnitTest.testModifyBrokerThrottles
 Key: KAFKA-10155
 URL: https://issues.apache.org/jira/browse/KAFKA-10155
 Project: Kafka
  Issue Type: Bug
  Components: admin
Reporter: Sophie Blee-Goldman


Seems to fail more often than not. I've seen this fail on almost every single 
build of the latest PRs

 
h3. Stacktrace

java.lang.NullPointerException at 
kafka.admin.ReassignPartitionsUnitTest.verifyBrokerThrottleResults(ReassignPartitionsUnitTest.scala:577)
 at 
kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles(ReassignPartitionsUnitTest.scala:540)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.lang.Thread.run(Thread.java:830)
h3. Standard Output

Current partition replica assignment 
\{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
 Proposed partition reassignment configuration 
\{"version":1,"partitions":[{"topic":"bar","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,3],"log_dirs":["any","any","any"]}]}
 Current partition replica assignment 
\{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
 Proposed partition reassignment configuration 
\{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},\{"topic":"foo","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10147) MockAdminClient#describeConfigs(Collection) is unable to handle broker resource

2020-06-11 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10147:
---
Priority: Blocker  (was: Major)

> MockAdminClient#describeConfigs(Collection) is unable to 
> handle broker resource
> ---
>
> Key: KAFKA-10147
> URL: https://issues.apache.org/jira/browse/KAFKA-10147
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
>
> MockAdminClient#describeConfigs(Collection) has new 
> implementation introduced by 
> https://github.com/apache/kafka/commit/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06.
>  It does not handle broker resource so 
> ReassignPartitionsUnitTest#testModifyBrokerThrottles throws NPE



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10154) Issue in updating metadata if not exists during sending message to different topics

2020-06-11 Thread Dipti Gupta (Jira)
Dipti Gupta created KAFKA-10154:
---

 Summary: Issue in updating metadata if not exists during sending 
message to different topics
 Key: KAFKA-10154
 URL: https://issues.apache.org/jira/browse/KAFKA-10154
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.5.0
Reporter: Dipti Gupta


Project with following behaviour at : 
[https://github.com/DiptiGupta/kafka-producer-issue]

 

I took reference to this fixed issue 
https://issues.apache.org/jira/browse/KAFKA-8623

But on latest version, 

I'm getting following exception during sending messages to different topics 
i.e. Topic1 and Topic2.

It's causing exception for once when metadata for *`Topic2`* doesn't exist.

 
{code:java}
org.springframework.kafka.KafkaException: Send failed; nested exception is 
org.apache.kafka.common.errors.TimeoutException: Topic Topic2 not present in 
metadata after 1 ms.org.springframework.kafka.KafkaException: Send failed; 
nested exception is org.apache.kafka.common.errors.TimeoutException: Topic 
Topic2 not present in metadata after 1 ms. at 
org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) 
~[spring-kafka-2.5.1.RELEASE.jar:2.5.1.RELEASE]{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-11 Thread GitBox


andrewchoi5 commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-643017385


   > > In the catch case for ZooKeeperClientException, I have populated the 
responseMap with the topic partition and the `Errors.NETWORK_EXCEPTION`. If you 
suggest any other Error to be populated in this responseMap, please let me know 
and I will change it accordingly.
   > 
   > I haven't looked at this code in a while so I may not have enough context 
at this point, but I don't think we should use the network exception error code 
- i.e., this isn't a network issue between the coordinator and broker but 
between the broker and zk. Also, there doesn't seem to be any active retry 
attempt from the controller to resend the request in this scenario.
   
   Correct -- I wasn't able to find the best, close enough Errors exception to 
populate, especially since there was none related to ZooKeeper in that class. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jjkoshy commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-11 Thread GitBox


jjkoshy commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-643015314


   
   > In the catch case for ZooKeeperClientException, I have populated the 
responseMap with the topic partition and the `Errors.NETWORK_EXCEPTION`. If you 
suggest any other Error to be populated in this responseMap, please let me know 
and I will change it accordingly.
   
   I haven't looked at this code in a while so I may not have enough context at 
this point, but I don't think we should use the network exception error code - 
i.e., this isn't a network issue between the coordinator and broker but between 
the broker and zk. Also, there doesn't seem to be any active retry attempt from 
the controller to resend the request in this scenario.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-11 Thread GitBox


skaundinya15 commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-643012766


   Thanks for the reviews @hachikuji. I updated the PR per your suggestions and 
left some follow up comments for clarification. Whenever you're ready, the PR 
is ready for another review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-11 Thread GitBox


skaundinya15 commented on a change in pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#discussion_r439156564



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1784,8 +1785,24 @@ class Log(@volatile private var _dir: File,
   private def deleteRetentionMsBreachedSegments(): Int = {
 if (config.retentionMs < 0) return 0
 val startMs = time.milliseconds
-deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > 
config.retentionMs,
-  reason = s"retention time ${config.retentionMs}ms breach")
+
+def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) 
= {
+  if (startMs - segment.largestTimestamp > config.retentionMs) {
+segment.largestRecordTimestamp match {
+  case Some(ts) =>
+info(s"Segment with base offset ${segment.baseOffset} will be 
deleted due to" +
+  s" retentionMs breach. Largest record timestamp of segment is 
$ts")

Review comment:
   > Also, we seem to have lost mention o
   
   I'll clarify the log message as you suggested. Could you explain what you 
meant by the above?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-11 Thread GitBox


andrewchoi5 commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-643010366


   Hi @junrao. Thank you for your review -- I have further synced up with 
@jjkoshy on this PR.
   
   The partition's new leader epoch update is actually happening after the 
point at which ZooKeeper Exception is thrown. Therefore, when the `createLogs` 
throws ZooKeeper Exception, the new leader epoch does not actually get updated.
   
   In the catch case for ZooKeeperClientException, I have populated the 
responseMap with the topic partition and the `Errors.NETWORK_EXCEPTION`. If you 
suggest any other Error to be populated in this responseMap, please let me know 
and I will change it accordingly. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-11 Thread GitBox


skaundinya15 commented on a change in pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#discussion_r439155424



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1702,11 +1702,12 @@ class Log(@volatile private var _dir: File,
*  (if there is one) and returns true iff it is deletable
* @return The number of segments deleted
*/
-  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean, reason: String): Int = {
+  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean) = {

Review comment:
   Wouldn't keeping the reason be a redundant? Since for every segment we 
delete we are logging exactly why we are deleting and the details surrounding 
the deleition.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-11 Thread GitBox


guozhangwang commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643007770


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-11 Thread GitBox


guozhangwang commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-643007836


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-11 Thread GitBox


guozhangwang commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-643007754


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-11 Thread GitBox


ableegoldman commented on pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#issuecomment-643003389


   @guozhangwang @mjsax @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup

2020-06-11 Thread GitBox


ableegoldman opened a new pull request #8856:
URL: https://github.com/apache/kafka/pull/8856


   1. KAFKA-10150: 
   - always transition to SUSPENDED during `suspend`, no matter the current 
state
   - only call `prepareCommit` before closing if `task.commitNeeded` is true
   2. Don't commit any consumed offsets during `handleAssignment` -- revoked 
active tasks (and any others that need committing) will be committed during 
`handleRevocation` ao we only need to worry about cleaning them up in 
`handleAssignment`
   3. KAFKA-10152: when recycling a task we should always commit consumed 
offsets (if any), but don't need to write the checkpoint (since changelog 
offsets are preserved across task transitions) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-06-11 Thread GitBox


apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r439143850



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
 acceptorBlockedPercentMeter: 
com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  if (!connectionSlotAvailable(listenerName)) {
+  val startTimeMs = time.milliseconds()
+  val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+  if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
 val startNs = time.nanoseconds
+val endThrottleTimeMs = startTimeMs + throttleTimeMs
+var remainingThrottleTimeMs = throttleTimeMs
 do {
-  counts.wait()
-} while (!connectionSlotAvailable(listenerName))
+  counts.wait(remainingThrottleTimeMs)

Review comment:
   This is exactly the behavior proposed in KIP -- if we reach any limit 
(number of connections or connection rate), we need to wait. So, if there is no 
space for a new connection, and the delay due to rate limit has passed, we 
would have to wait for a connection slot. However, remember that if we are 
waiting on an inter-broker connection slot, the broker finds and closes a 
connection of another listener to accommodate inter-broker connection. See 
KIP-402.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-11 Thread GitBox


guozhangwang commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642992126


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #8855: DO NOT MERGE

2020-06-11 Thread GitBox


mjsax opened a new pull request #8855:
URL: https://github.com/apache/kafka/pull/8855


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-11 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for retry backoff ms.
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, I already wrapped the exponential backoff/timeout util class in my 
KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of attempts and returns the backoff/timeout value at 
the corresponding level. Thus, we can add a new class property to those classes 
containing retriable data in order to record the number of failed attempts.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So we can let the Accumulator calculate the new 
retry backoff for each bach when it enqueues them, to avoid instantiate the 
util class multiple times.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new class 
property of type `Long` to record the number of attempts.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
Though the actual requests are packed for each node, the current implementation 
is applying backoff to each topic partition, where the backoff value is kept by 
TopicPartitionState. Thus, TopicPartitionState will have the new property 
recording the number of attempts.

Metadata:
 #  Metadata lives as a singleton in many clients. Add a new property recording 
the number of attempts

 AdminClient:
 # AdminClient has its own request abstraction Call. The failed attempts are 
already kept by the abstraction. So probably clean the Call class logic a bit.

There're other common usages look like client.poll(timeout), where the timeout 
passed in is the retry backoff value. We won't change these usages since its 
underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), 
which means if no interested op exists, the client will block retry backoff 
milliseconds. This is an optimization when there's no request that needs to be 
sent but the client is waiting for responses. Specifically, if the client fails 
the inflight requests before the retry backoff milliseconds passed, it still 
needs to wait until that amount of time passed, unless there's a new request 
need to be sent.

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for retry backoff ms.
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, I already wrapped the exponential backoff/timeout util class in my 
KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of attempts and returns the backoff/timeout value at 
the corresponding level. Thus, we can add a new class property to those classes 
containing retriable data in order to record the number of failed attempts.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by aggregation.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will aggregate the 
context class and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
Though the actual requests are packed for each node, the current implementation 
is applying backoff to each topic partition, where the backoff value is kept by 
TopicPartitionState. Thus, TopicPartitionState can aggregate the context.

Metadata:
 #

[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-11 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for retry backoff ms.
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, I already wrapped the exponential backoff/timeout util class in my 
KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of attempts and returns the backoff/timeout value at 
the corresponding level. Thus, we can add a new class property to those classes 
containing retriable data in order to record the number of failed attempts.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by aggregation.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will aggregate the 
context class and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
Though the actual requests are packed for each node, the current implementation 
is applying backoff to each topic partition, where the backoff value is kept by 
TopicPartitionState. Thus, TopicPartitionState can aggregate the context.

Metadata:
 #  Metadata lives as a singleton in many clients. It can aggregate the context.

 AdminClient:
 # AdminClient has its own request abstraction Call. The failed attempts are 
kept by the abstraction. So probably clean the Call class logic a bit by 
aggregation.

There're other common usages look like client.poll(timeout), where the timeout 
passed in is the retry backoff value. We won't change these usages since its 
underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), 
which means if no interested op exists, the client will block retry backoff 
milliseconds. This is an optimization when there's no request that needs to be 
sent but the client is waiting for responses. Specifically, if the client fails 
the inflight requests before the retry backoff milliseconds passed, it still 
needs to wait until that amount of time passed, unless there's a new request 
need to be sent.

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for retry backoff ms.
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data aggregate an 
abstract class FailureContext. Retriable will implement interfaces recording 
the number of failed attempts. I already wrapped the exponential 
backoff/timeout util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by aggregation.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will aggregate the 
context class and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
Though the actual requests are packed for each node, the current implementation 
is applying backoff to each topic partition, where the backoff value is kept by 
TopicPartitionState. Thus, TopicPartitionState can aggregate the context.

Metadata:
 #  Metadata lives as a singleton in many clients. It c

[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-11 Thread GitBox


ableegoldman commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642984741


   Unrelated flaky test failed: 
`kafka.admin.ReassignPartitionsUnitTest.testModifyBrokerThrottles` 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8686) Flaky test ExampleConnectIntegrationTest#testSinkConnector

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-8686.
---
Resolution: Fixed

This test class has been running dependably for some time. Closing as fixed. 

> Flaky test ExampleConnectIntegrationTest#testSinkConnector
> --
>
> Key: KAFKA-8686
> URL: https://issues.apache.org/jira/browse/KAFKA-8686
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Konstantine Karantasis
>Priority: Major
>  Labels: flaky-test
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/429/console
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector.test.stdout*20:09:20*
>  *20:09:20* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > 
> testSinkConnector FAILED*20:09:20* java.lang.AssertionError: Condition 
> not met within timeout 15000. Connector tasks were not assigned a partition 
> each.*20:09:20* at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*20:09:20*
>  at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*20:09:20*
>  at 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector(ExampleConnectIntegrationTest.java:128)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #7224: KAFKA-8391, KAFKA-8661: Improve flaky Connect rebalance integration tests

2020-06-11 Thread GitBox


kkonstantine commented on pull request #7224:
URL: https://github.com/apache/kafka/pull/7224#issuecomment-642975105


   @rhauch https://github.com/apache/kafka/pull/8805 seems to have fixed the 
root cause of these failures. The tests haven't failed since then. They seem 
more reliable now, at least compared to the previous state. 
   
   You might want to consider closing or refactoring this PR to keep only the 
useful parts (logging / testing hooks) in favor of 
https://github.com/apache/kafka/pull/8805



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8661) Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-8661.
---
Resolution: Fixed

The tests have been fixed and reenabled with 


[https://github.com/apache/kafka/pull/8805]

and have been running dependably since then 

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors
> --
>
> Key: KAFKA-8661
> URL: https://issues.apache.org/jira/browse/KAFKA-8661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/224/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testStartTwoConnectors/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not start in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testStartTwoConnectors(RebalanceSourceConnectorsIntegrationTest.java:120){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8661) Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-8661:
--
Fix Version/s: 2.5.1
   2.4.2
   2.3.2

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testStartTwoConnectors
> --
>
> Key: KAFKA-8661
> URL: https://issues.apache.org/jira/browse/KAFKA-8661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/224/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testStartTwoConnectors/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not start in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testStartTwoConnectors(RebalanceSourceConnectorsIntegrationTest.java:120){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8686) Flaky test ExampleConnectIntegrationTest#testSinkConnector

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis reassigned KAFKA-8686:
-

Assignee: Konstantine Karantasis

> Flaky test ExampleConnectIntegrationTest#testSinkConnector
> --
>
> Key: KAFKA-8686
> URL: https://issues.apache.org/jira/browse/KAFKA-8686
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Konstantine Karantasis
>Priority: Major
>  Labels: flaky-test
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/429/console
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector.test.stdout*20:09:20*
>  *20:09:20* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > 
> testSinkConnector FAILED*20:09:20* java.lang.AssertionError: Condition 
> not met within timeout 15000. Connector tasks were not assigned a partition 
> each.*20:09:20* at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*20:09:20*
>  at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*20:09:20*
>  at 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector(ExampleConnectIntegrationTest.java:128)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8555) Flaky test ExampleConnectIntegrationTest#testSourceConnector

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis reassigned KAFKA-8555:
-

Assignee: Konstantine Karantasis

> Flaky test ExampleConnectIntegrationTest#testSourceConnector
> 
>
> Key: KAFKA-8555
> URL: https://issues.apache.org/jira/browse/KAFKA-8555
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Boyang Chen
>Assignee: Konstantine Karantasis
>Priority: Major
> Attachments: log-job139.txt, log-job141.txt, log-job23145.txt, 
> log-job23215.txt, log-job6046.txt
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/22798/console]
> *02:03:21* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector.test.stdout*02:03:21*
>  *02:03:21* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > 
> testSourceConnector FAILED*02:03:21* 
> org.apache.kafka.connect.errors.DataException: Insufficient records committed 
> by connector simple-conn in 15000 millis. Records expected=2000, 
> actual=1013*02:03:21* at 
> org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:188)*02:03:21*
>  at 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:181)*02:03:21*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-8391.
---
Resolution: Fixed

The tests have been fixed and reenabled with 


[https://github.com/apache/kafka/pull/8805]

and have been running dependably since then 

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-8391:
--
Fix Version/s: 2.5.1
   2.4.2
   2.3.2

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-11 Thread GitBox


mjsax commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642968031







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8852: MINOR: code cleanup for Kafka Streams task classes

2020-06-11 Thread GitBox


mjsax commented on pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#issuecomment-642966020


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection

2020-06-11 Thread GitBox


abbccdda commented on a change in pull request #8853:
URL: https://github.com/apache/kafka/pull/8853#discussion_r439101876



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -560,10 +529,14 @@ synchronized private Config 
getResourceDescription(ConfigResource resource) {
 }
 case TOPIC: {
 TopicMetadata topicMetadata = allTopics.get(resource.name());
-if (topicMetadata == null) {
-throw new UnknownTopicOrPartitionException();
+if (topicMetadata != null && !topicMetadata.markedForDeletion) 
{
+if (topicMetadata.fetchesRemainingUntilVisible > 0) 
topicMetadata.fetchesRemainingUntilVisible--;

Review comment:
   nit: topicMetadata.fetchesRemainingUntilVisible = Math.max(0, 
topicMetadata.fetchesRemainingUntilVisible -1);

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -365,51 +366,6 @@ synchronized public DescribeTopicsResult 
describeTopics(Collection topic
 return new DescribeTopicsResult(topicDescriptions);
 }
 
-@Override
-public DescribeConfigsResult describeConfigs(Collection 
resources) {

Review comment:
   Maybe just deprecate public method instead of removing it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


guozhangwang commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r439093372



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -210,9 +214,8 @@ public void handleAssignment(final Map> activeTasks,
 tasksToRecycle.add(task);
 } else {
 try {
-task.prepareCloseClean();
-final Map 
committableOffsets = task
-.committableOffsetsAndMetadata();
+task.suspend();

Review comment:
   We may not call `handleRevocation` before calling `handleAssignment` so 
the task to close may not be in SUSPENDED state yet, and hence do close them we 
need to commit their states. For other tasks, they are not necessarily 
committing but I think the point was, that since we are going to send one 
`commit` request anyways so just commit for everyone --- note that flushing can 
indeed be skipped, which is what KAFKA-9450 covers





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


ableegoldman commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r439086416



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -210,9 +214,8 @@ public void handleAssignment(final Map> activeTasks,
 tasksToRecycle.add(task);
 } else {
 try {
-task.prepareCloseClean();
-final Map 
committableOffsets = task
-.committableOffsetsAndMetadata();
+task.suspend();

Review comment:
   @mjsax @guozhangwang why do we need to commit at all during 
`handleAssignment`? Shouldn't we have already committed all tasks that _need_ 
to be committed during `handleRevocation`? 
   
   That's not exactly a bug, I'm just wondering if it's necessary?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10149) Do not prevent automatic preferred election when reassignment in progress

2020-06-11 Thread Bob Barrett (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett reassigned KAFKA-10149:
---

Assignee: Bob Barrett

> Do not prevent automatic preferred election when reassignment in progress
> -
>
> Key: KAFKA-10149
> URL: https://issues.apache.org/jira/browse/KAFKA-10149
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Bob Barrett
>Priority: Major
>
> Currently the controller will not do preferred leader elections automatically 
> when a reassignment is in progress. If a broker crashes or is restarted with 
> a reassignment in progress, this leaves the leadership massively skewed until 
> the reassignment completes. I am not sure if there is a good reason for this, 
> but it seems not ideal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10153) Documentation for the Errant Record Reporter

2020-06-11 Thread Aakash Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aakash Shah updated KAFKA-10153:

Component/s: KafkaConnect

> Documentation for the Errant Record Reporter
> 
>
> Key: KAFKA-10153
> URL: https://issues.apache.org/jira/browse/KAFKA-10153
> Project: Kafka
>  Issue Type: Task
>  Components: documentation, KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Aakash Shah
>Assignee: Aakash Shah
>Priority: Major
>
> Add documentation for the new Errant Record Reporter API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10153) Documentation for the Errant Record Reporter

2020-06-11 Thread Aakash Shah (Jira)
Aakash Shah created KAFKA-10153:
---

 Summary: Documentation for the Errant Record Reporter
 Key: KAFKA-10153
 URL: https://issues.apache.org/jira/browse/KAFKA-10153
 Project: Kafka
  Issue Type: Task
  Components: documentation
Affects Versions: 2.6.0
Reporter: Aakash Shah
Assignee: Aakash Shah


Add documentation for the new Errant Record Reporter API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


ableegoldman commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r439075789



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -504,88 +438,66 @@ public void update(final Set 
topicPartitions, final Map
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- * 
- *
- * @param cleanshut down cleanly (ie, incl. flush) if {@code true} --
- * otherwise, just close open resources
- * @throws TaskMigratedException if the task producer got fenced (EOS)
- */
-private void prepareClose(final boolean clean) {
-// Reset any previously scheduled checkpoint.
-checkpoint = null;
-
+private void maybeScheduleCheckpoint() {
 switch (state()) {
-case CREATED:
-// the task is created and not initialized, just re-write the 
checkpoint file
-scheduleCheckpoint(emptyMap());
+case RESTORING:
+this.checkpoint = checkpointableOffsets();
+
 break;
 
 case RUNNING:
-closeTopology(clean);
-
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-scheduleCheckpoint(checkpointableOffsets());
-} else {
-executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+if (!eosEnabled) {
+this.checkpoint = checkpointableOffsets();
 }
 
 break;
 
-case RESTORING:
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-scheduleCheckpoint(emptyMap());
+case SUSPENDED:
+this.checkpoint = checkpointableOffsets();
 
 break;
 
-case SUSPENDED:
+case CREATED:
 case CLOSED:
-// not need to checkpoint, since when suspending we've already 
committed the state
-break;
+throw new IllegalStateException("Illegal state " + state() + " 
while scheduling checkpoint for active task " + id);
 
 default:
-throw new IllegalStateException("Unknown state " + state() + " 
while prepare closing active task " + id);
+throw new IllegalStateException("Unknown state " + state() + " 
while scheduling checkpoint for active task " + id);
 }
 }
 
-private void scheduleCheckpoint(final Map 
checkpoint) {
-this.checkpoint = checkpoint;
-}
-
 private void writeCheckpointIfNeed() {
+if (commitNeeded) {

Review comment:
   Too late, I already created a ticket for it 🙂 But after starting to work 
on it, I agree, they should be addressed in one PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


guozhangwang commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r439074869



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -504,88 +438,66 @@ public void update(final Set 
topicPartitions, final Map
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- * 
- *
- * @param cleanshut down cleanly (ie, incl. flush) if {@code true} --
- * otherwise, just close open resources
- * @throws TaskMigratedException if the task producer got fenced (EOS)
- */
-private void prepareClose(final boolean clean) {
-// Reset any previously scheduled checkpoint.
-checkpoint = null;
-
+private void maybeScheduleCheckpoint() {
 switch (state()) {
-case CREATED:
-// the task is created and not initialized, just re-write the 
checkpoint file
-scheduleCheckpoint(emptyMap());
+case RESTORING:
+this.checkpoint = checkpointableOffsets();
+
 break;
 
 case RUNNING:
-closeTopology(clean);
-
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-scheduleCheckpoint(checkpointableOffsets());
-} else {
-executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+if (!eosEnabled) {
+this.checkpoint = checkpointableOffsets();
 }
 
 break;
 
-case RESTORING:
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-scheduleCheckpoint(emptyMap());
+case SUSPENDED:
+this.checkpoint = checkpointableOffsets();
 
 break;
 
-case SUSPENDED:
+case CREATED:
 case CLOSED:
-// not need to checkpoint, since when suspending we've already 
committed the state
-break;
+throw new IllegalStateException("Illegal state " + state() + " 
while scheduling checkpoint for active task " + id);
 
 default:
-throw new IllegalStateException("Unknown state " + state() + " 
while prepare closing active task " + id);
+throw new IllegalStateException("Unknown state " + state() + " 
while scheduling checkpoint for active task " + id);
 }
 }
 
-private void scheduleCheckpoint(final Map 
checkpoint) {
-this.checkpoint = checkpoint;
-}
-
 private void writeCheckpointIfNeed() {
+if (commitNeeded) {

Review comment:
   Nice catch. Let's just fix it along with 10150?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10152) Attempt to write checkpoint without first committing during recycle

2020-06-11 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10152:
---

Assignee: Sophie Blee-Goldman

> Attempt to write checkpoint without first committing during recycle
> ---
>
> Key: KAFKA-10152
> URL: https://issues.apache.org/jira/browse/KAFKA-10152
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Seen causing failure of 
> `SmokeTestDirverIntegrationTest#shouldWorkWithRebalance locally:
> {code:java}
> Caused by: java.lang.IllegalStateException: A checkpoint should only be 
> written if no commit is needed. at 
> org.apache.kafka.streams.processor.internals.StreamTask.writeCheckpointIfNeed(StreamTask.java:534)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeAndRecycleState(StreamTask.java:482)
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:288)
> {code}
> See comment here: https://github.com/apache/kafka/pull/8833/files#r438953766



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-11 Thread GitBox


vvcephei commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-642902146


   I've noticed that if you complain, you tend to get results...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-11 Thread GitBox


vvcephei commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-642902034


   Ah, there we go :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-11 Thread GitBox


vvcephei commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-642901512


   Hey @bellemare , Thanks so much for the update! Jenkins has been a bit lazy 
recently, so I'll probably be pinging it for a while...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-11 Thread GitBox


vvcephei commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-642901027







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-11 Thread GitBox


vvcephei commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-642900612







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10152) Attempt to write checkpoint without first committing during recycle

2020-06-11 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10152:
---

 Summary: Attempt to write checkpoint without first committing 
during recycle
 Key: KAFKA-10152
 URL: https://issues.apache.org/jira/browse/KAFKA-10152
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
 Fix For: 2.6.0


Seen causing failure of `SmokeTestDirverIntegrationTest#shouldWorkWithRebalance 
locally:
{code:java}
Caused by: java.lang.IllegalStateException: A checkpoint should only be written 
if no commit is needed. at 
org.apache.kafka.streams.processor.internals.StreamTask.writeCheckpointIfNeed(StreamTask.java:534)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.closeAndRecycleState(StreamTask.java:482)
 at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:115)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:288)
{code}
See comment here: https://github.com/apache/kafka/pull/8833/files#r438953766



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10151) Flaky Test StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi

2020-06-11 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10151:

Summary: Flaky Test 
StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
  (was: Flaky Test 
StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapiStoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi)

> Flaky Test 
> StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
> --
>
> Key: KAFKA-10151
> URL: https://issues.apache.org/jira/browse/KAFKA-10151
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, integration-test
> Attachments: 
> StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi.stdout.rtf
>
>
> I've started seeing this fail in the past week or so. Checked out the logs 
> and there's nothing obviously wrong (ie no ERROR or exception) so it might 
> just be flaky? 
>  
> java.lang.AssertionError: Condition not met within timeout 6. Could not 
> get expected result in time. at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) at 
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.verifyCountWithTimestamp(StoreUpgradeIntegrationTest.java:367)
>  at 
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:183)
>  at 
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:109)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

2020-06-11 Thread GitBox


hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r439009094



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
 @Override
 public boolean hasPosition() {

Review comment:
   Sure, but that led to the opposite problem, in which the enum was 
inconsistent with the state. In regard to position, I think we should handle 
this at transition time as mentioned below. If we ensure that position is not 
null in the fetching and validating states, then I don't see a problem changing 
`hasPosition` to check it directly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10151) Flaky Test StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapiStoreUpgradeIntegrationTest.shouldMigratePersistentKeyValue

2020-06-11 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10151:
---

 Summary: Flaky Test 
StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapiStoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
 Key: KAFKA-10151
 URL: https://issues.apache.org/jira/browse/KAFKA-10151
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
 Attachments: 
StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi.stdout.rtf

I've started seeing this fail in the past week or so. Checked out the logs and 
there's nothing obviously wrong (ie no ERROR or exception) so it might just be 
flaky? 

 

java.lang.AssertionError: Condition not met within timeout 6. Could not get 
expected result in time. at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) at 
org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.verifyCountWithTimestamp(StoreUpgradeIntegrationTest.java:367)
 at 
org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:183)
 at 
org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:109)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


ableegoldman commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r439004982



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -504,88 +438,66 @@ public void update(final Set 
topicPartitions, final Map
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- * 
- *
- * @param cleanshut down cleanly (ie, incl. flush) if {@code true} --
- * otherwise, just close open resources
- * @throws TaskMigratedException if the task producer got fenced (EOS)
- */
-private void prepareClose(final boolean clean) {
-// Reset any previously scheduled checkpoint.
-checkpoint = null;
-
+private void maybeScheduleCheckpoint() {
 switch (state()) {
-case CREATED:
-// the task is created and not initialized, just re-write the 
checkpoint file
-scheduleCheckpoint(emptyMap());
+case RESTORING:
+this.checkpoint = checkpointableOffsets();
+
 break;
 
 case RUNNING:
-closeTopology(clean);
-
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-scheduleCheckpoint(checkpointableOffsets());
-} else {
-executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+if (!eosEnabled) {
+this.checkpoint = checkpointableOffsets();
 }
 
 break;
 
-case RESTORING:
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-scheduleCheckpoint(emptyMap());
+case SUSPENDED:
+this.checkpoint = checkpointableOffsets();
 
 break;
 
-case SUSPENDED:
+case CREATED:
 case CLOSED:
-// not need to checkpoint, since when suspending we've already 
committed the state
-break;
+throw new IllegalStateException("Illegal state " + state() + " 
while scheduling checkpoint for active task " + id);
 
 default:
-throw new IllegalStateException("Unknown state " + state() + " 
while prepare closing active task " + id);
+throw new IllegalStateException("Unknown state " + state() + " 
while scheduling checkpoint for active task " + id);
 }
 }
 
-private void scheduleCheckpoint(final Map 
checkpoint) {
-this.checkpoint = checkpoint;
-}
-
 private void writeCheckpointIfNeed() {
+if (commitNeeded) {

Review comment:
   Just saw this in the 
`SmokeTestDriverIntegrationTest#shouldWorkWithRebalance` -- not sure if it 
merits a separate ticket or can just be fixed together with 
https://issues.apache.org/jira/browse/KAFKA-10150 ?
   ```
   Caused by: java.lang.IllegalStateException: A checkpoint should only be 
written if no commit is needed.
at 
org.apache.kafka.streams.processor.internals.StreamTask.writeCheckpointIfNeed(StreamTask.java:534)
at 
org.apache.kafka.streams.processor.internals.StreamTask.closeAndRecycleState(StreamTask.java:482)
at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:115)
at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:288)
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -504,88 +438,66 @@ public void update(final Set 
topicPartitions, final Map
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- * 
- *
- * @param cleanshut down cleanly (ie, incl. flush) if {@code true} --
- * otherwise, just close open resources
- * @throws TaskMigratedException if the task producer got fenced (EOS)
- */
-private void prepareClose(final boolean clean) {
-// Reset any previously scheduled checkpoint.
-checkpoint = null;
-
+private void maybeScheduleCheckpoint() {
 switch (state()) {
-case CREATED:
-// the task is created and not initialized, just re-write the 
checkpoint file
-scheduleCheckpoint(emptyMap());
+case RESTORING:
+this.checkpoint = checkpointableOffsets();
+
 break;
 
 case RUNNING:
-

[GitHub] [kafka] hachikuji commented on a change in pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-11 Thread GitBox


hachikuji commented on a change in pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#discussion_r438997622



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1702,11 +1702,12 @@ class Log(@volatile private var _dir: File,
*  (if there is one) and returns true iff it is deletable
* @return The number of segments deleted
*/
-  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean, reason: String): Int = {
+  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean) = {

Review comment:
   Can we keep the reason? On second thought, maybe it's fine to leave this 
as is.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1794,20 +1811,29 @@ class Log(@volatile private var _dir: File,
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) 
= {
   if (diff - segment.size >= 0) {
 diff -= segment.size
+info(s"Segment with base offset ${segment.baseOffset} will be deleted 
due to" +

Review comment:
   In addition, it might be useful to know the total log size. Maybe we 
could include `size - diff` as the size after deletion?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1784,8 +1785,24 @@ class Log(@volatile private var _dir: File,
   private def deleteRetentionMsBreachedSegments(): Int = {
 if (config.retentionMs < 0) return 0
 val startMs = time.milliseconds
-deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > 
config.retentionMs,
-  reason = s"retention time ${config.retentionMs}ms breach")
+
+def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) 
= {
+  if (startMs - segment.largestTimestamp > config.retentionMs) {
+segment.largestRecordTimestamp match {
+  case Some(ts) =>
+info(s"Segment with base offset ${segment.baseOffset} will be 
deleted due to" +
+  s" retentionMs breach. Largest record timestamp of segment is 
$ts")

Review comment:
   nit: could we make the connection clearer? How about this?
   ```scala
  info(s"Segment with base offset ${segment.baseOffset} will be deleted due 
to" +
 s" retentionMs breach based on the largest record timestamp 
from the segment, which is $ts")
   ```
   
   Also, we seem to have lost mention o





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-06-11 Thread GitBox


tombentley commented on pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#issuecomment-642863307


   @mimaison done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10027) Implement read path for feature versioning scheme

2020-06-11 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-10027.
-
Fix Version/s: 2.7.0
 Assignee: Kowshik Prakasam
   Resolution: Fixed

merged the PR to trunk

> Implement read path for feature versioning scheme
> -
>
> Key: KAFKA-10027
> URL: https://issues.apache.org/jira/browse/KAFKA-10027
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
> Fix For: 2.7.0
>
>
> Goal is to implement various classes and integration for the read path of the 
> feature versioning system 
> ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]).
>  The ultimate plan is that the cluster-wide *finalized* features information 
> is going to be stored in ZK under the node {{/feature}}. The read path 
> implemented in this PR is centered around reading this *finalized* features 
> information from ZK, and, processing it inside the Broker.
>  
> Here is a summary of what's needed for this Jira (a lot of it is *new* 
> classes):
>  * A facility is provided in the broker to declare it's supported features, 
> and advertise it's supported features via it's own {{BrokerIdZNode}} under a 
> {{features}} key.
>  * A facility is provided in the broker to listen to and propagate 
> cluster-wide *finalized* feature changes from ZK.
>  * When new *finalized* features are read from ZK, feature incompatibilities 
> are detected by comparing against the broker's own supported features.
>  * {{ApiVersionsResponse}} is now served containing supported and finalized 
> feature information (using the newly added tagged fields).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-11 Thread GitBox


junrao merged pull request #8680:
URL: https://github.com/apache/kafka/pull/8680


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-11 Thread GitBox


junrao commented on pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#issuecomment-642855293


   The test failures seem unrelated. Merging this to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10150) IllegalStateException when revoking task in CREATED state

2020-06-11 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10150:
---

 Summary: IllegalStateException when revoking task in CREATED state
 Key: KAFKA-10150
 URL: https://issues.apache.org/jira/browse/KAFKA-10150
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman
 Fix For: 2.6.0


Seen killing threads in soak. During handleAssignment we call #suspend and 
#prepareCommit on all revoked tasks. Only RUNNING tasks are transitioned to 
SUSPENDED in #suspend, but #prepareCommit will assert that the state is either 
RUNNING or SUSPENDED.

So tasks that get revoked while in CREATED will hit an IllegalStateException 
during prepareCommit:
{code:java}
[2020-06-11T00:39:57-07:00] 
(streams-soak-KAFKA-10144-corrupted-standby-fix-eos-beta-broker-trunk_soak_i-09ef677ab88c040c8_streamslog)
 [2020-06-11 07:39:56,852] ERROR 
[stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] 
stream-thread 
[stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] Failed 
to close task 1_2 cleanly. Attempting to close remaining tasks before 
re-throwing: 
(org.apache.kafka.streams.processor.internals.TaskManager)[2020-06-11T00:39:57-07:00]
 
(streams-soak-KAFKA-10144-corrupted-standby-fix-eos-beta-broker-trunk_soak_i-09ef677ab88c040c8_streamslog)
 [2020-06-11 07:39:56,852] ERROR 
[stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] 
stream-thread 
[stream-soak-test-90c4a6b9-e730-4211-9edb-146b74c54c8f-StreamThread-2] Failed 
to close task 1_2 cleanly. Attempting to close remaining tasks before 
re-throwing: 
(org.apache.kafka.streams.processor.internals.TaskManager)[2020-06-11T00:39:57-07:00]
 
(streams-soak-KAFKA-10144-corrupted-standby-fix-eos-beta-broker-trunk_soak_i-09ef677ab88c040c8_streamslog)
 java.lang.IllegalStateException: Illegal state CREATED while preparing standby 
task 1_2 for committing  at 
org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:142)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:228)
 at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1350)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) 
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:763)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:623)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10149) Do not prevent automatic preferred election when reassignment in progress

2020-06-11 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10149:
---

 Summary: Do not prevent automatic preferred election when 
reassignment in progress
 Key: KAFKA-10149
 URL: https://issues.apache.org/jira/browse/KAFKA-10149
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Currently the controller will not do preferred leader elections automatically 
when a reassignment is in progress. If a broker crashes or is restarted with a 
reassignment in progress, this leaves the leadership massively skewed until the 
reassignment completes. I am not sure if there is a good reason for this, but 
it seems not ideal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


ableegoldman commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r438957621



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -504,88 +438,66 @@ public void update(final Set 
topicPartitions, final Map
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- * 
- *
- * @param cleanshut down cleanly (ie, incl. flush) if {@code true} --
- * otherwise, just close open resources
- * @throws TaskMigratedException if the task producer got fenced (EOS)
- */
-private void prepareClose(final boolean clean) {
-// Reset any previously scheduled checkpoint.
-checkpoint = null;
-
+private void maybeScheduleCheckpoint() {
 switch (state()) {
-case CREATED:
-// the task is created and not initialized, just re-write the 
checkpoint file
-scheduleCheckpoint(emptyMap());
+case RESTORING:
+this.checkpoint = checkpointableOffsets();
+
 break;
 
 case RUNNING:
-closeTopology(clean);
-
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-scheduleCheckpoint(checkpointableOffsets());
-} else {
-executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+if (!eosEnabled) {
+this.checkpoint = checkpointableOffsets();
 }
 
 break;
 
-case RESTORING:
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-scheduleCheckpoint(emptyMap());
+case SUSPENDED:
+this.checkpoint = checkpointableOffsets();
 
 break;
 
-case SUSPENDED:
+case CREATED:
 case CLOSED:
-// not need to checkpoint, since when suspending we've already 
committed the state
-break;
+throw new IllegalStateException("Illegal state " + state() + " 
while scheduling checkpoint for active task " + id);
 
 default:
-throw new IllegalStateException("Unknown state " + state() + " 
while prepare closing active task " + id);
+throw new IllegalStateException("Unknown state " + state() + " 
while scheduling checkpoint for active task " + id);
 }
 }
 
-private void scheduleCheckpoint(final Map 
checkpoint) {
-this.checkpoint = checkpoint;
-}
-
 private void writeCheckpointIfNeed() {
+if (commitNeeded) {

Review comment:
   Maybe I'm thinking of standby tasks (ie we only skip checkpointing for 
recycled standbys). For active tasks, we should probably commit them before 
recycling right? Or is it ok to skip committing altogether 🤔 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


ableegoldman commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r438953766



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -504,88 +438,66 @@ public void update(final Set 
topicPartitions, final Map
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- * 
- *
- * @param cleanshut down cleanly (ie, incl. flush) if {@code true} --
- * otherwise, just close open resources
- * @throws TaskMigratedException if the task producer got fenced (EOS)
- */
-private void prepareClose(final boolean clean) {
-// Reset any previously scheduled checkpoint.
-checkpoint = null;
-
+private void maybeScheduleCheckpoint() {
 switch (state()) {
-case CREATED:
-// the task is created and not initialized, just re-write the 
checkpoint file
-scheduleCheckpoint(emptyMap());
+case RESTORING:
+this.checkpoint = checkpointableOffsets();
+
 break;
 
 case RUNNING:
-closeTopology(clean);
-
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-scheduleCheckpoint(checkpointableOffsets());
-} else {
-executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+if (!eosEnabled) {
+this.checkpoint = checkpointableOffsets();
 }
 
 break;
 
-case RESTORING:
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-scheduleCheckpoint(emptyMap());
+case SUSPENDED:
+this.checkpoint = checkpointableOffsets();
 
 break;
 
-case SUSPENDED:
+case CREATED:
 case CLOSED:
-// not need to checkpoint, since when suspending we've already 
committed the state
-break;
+throw new IllegalStateException("Illegal state " + state() + " 
while scheduling checkpoint for active task " + id);
 
 default:
-throw new IllegalStateException("Unknown state " + state() + " 
while prepare closing active task " + id);
+throw new IllegalStateException("Unknown state " + state() + " 
while scheduling checkpoint for active task " + id);
 }
 }
 
-private void scheduleCheckpoint(final Map 
checkpoint) {
-this.checkpoint = checkpoint;
-}
-
 private void writeCheckpointIfNeed() {
+if (commitNeeded) {

Review comment:
   @mjsax  By `should only be written if no commit is needed` do you mean 
`...if a commit was just completed`? 
   Doesn't this break `closeAndRecycleState` (I thought iI saw in another 
comment that we don't write checkpoints during recycle anymore?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-11 Thread GitBox


ableegoldman commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r438953766



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -504,88 +438,66 @@ public void update(final Set 
topicPartitions, final Map
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- * 
- *
- * @param cleanshut down cleanly (ie, incl. flush) if {@code true} --
- * otherwise, just close open resources
- * @throws TaskMigratedException if the task producer got fenced (EOS)
- */
-private void prepareClose(final boolean clean) {
-// Reset any previously scheduled checkpoint.
-checkpoint = null;
-
+private void maybeScheduleCheckpoint() {
 switch (state()) {
-case CREATED:
-// the task is created and not initialized, just re-write the 
checkpoint file
-scheduleCheckpoint(emptyMap());
+case RESTORING:
+this.checkpoint = checkpointableOffsets();
+
 break;
 
 case RUNNING:
-closeTopology(clean);
-
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-scheduleCheckpoint(checkpointableOffsets());
-} else {
-executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+if (!eosEnabled) {
+this.checkpoint = checkpointableOffsets();
 }
 
 break;
 
-case RESTORING:
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-scheduleCheckpoint(emptyMap());
+case SUSPENDED:
+this.checkpoint = checkpointableOffsets();
 
 break;
 
-case SUSPENDED:
+case CREATED:
 case CLOSED:
-// not need to checkpoint, since when suspending we've already 
committed the state
-break;
+throw new IllegalStateException("Illegal state " + state() + " 
while scheduling checkpoint for active task " + id);
 
 default:
-throw new IllegalStateException("Unknown state " + state() + " 
while prepare closing active task " + id);
+throw new IllegalStateException("Unknown state " + state() + " 
while scheduling checkpoint for active task " + id);
 }
 }
 
-private void scheduleCheckpoint(final Map 
checkpoint) {
-this.checkpoint = checkpoint;
-}
-
 private void writeCheckpointIfNeed() {
+if (commitNeeded) {

Review comment:
   @mjsax  By `should only be written if no commit is needed` do you mean 
`...if a commit was just completed`? 
   Doesn't this break `closeAndRecycleState` and `closeDirty` (since we tend to 
call `closeDirty` after the commit failed and would not have set `commitNeeded 
= false`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8555) Flaky test ExampleConnectIntegrationTest#testSourceConnector

2020-06-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-8555.
---
Resolution: Fixed

This test class has been running dependably for some time. Closing as fixed. 

> Flaky test ExampleConnectIntegrationTest#testSourceConnector
> 
>
> Key: KAFKA-8555
> URL: https://issues.apache.org/jira/browse/KAFKA-8555
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Boyang Chen
>Priority: Major
> Attachments: log-job139.txt, log-job141.txt, log-job23145.txt, 
> log-job23215.txt, log-job6046.txt
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/22798/console]
> *02:03:21* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector.test.stdout*02:03:21*
>  *02:03:21* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > 
> testSourceConnector FAILED*02:03:21* 
> org.apache.kafka.connect.errors.DataException: Insufficient records committed 
> by connector simple-conn in 15000 millis. Records expected=2000, 
> actual=1013*02:03:21* at 
> org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:188)*02:03:21*
>  at 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:181)*02:03:21*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9851) Revoking Connect tasks due to connectivity issues should also clear running assignment

2020-06-11 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9851:
-
Component/s: KafkaConnect

> Revoking Connect tasks due to connectivity issues should also clear running 
> assignment
> --
>
> Key: KAFKA-9851
> URL: https://issues.apache.org/jira/browse/KAFKA-9851
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> https://issues.apache.org/jira/browse/KAFKA-9184 fixed an issue with workers 
> continuing to run tasks even after they'd lose connectivity with the broker 
> coordinator and they'd detect that they are out of the group. 
>  
> However, because the revocation of tasks in this case is voluntary and does 
> not come with an explicit assignment (containing revoked tasks) from the 
> leader worker, the worker that quits running its tasks due to connectivity 
> issues needs to also clear its running task assignment snapshot. 
> This will allow for proper restart of the stopped tasks after the worker 
> rejoins the group when connectivity returns and get assigned the same 
> connectors or tasks. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest

2020-06-11 Thread GitBox


bbejeck commented on pull request #6229:
URL: https://github.com/apache/kafka/pull/6229#issuecomment-642777124


   Hi @sh-abhi,
   
   >'m having some difficulty doing a rebase successfully after trying some 
different approaches based on some references I found online.
   
   This PR looks good to me, can you say what your difficulties are?
   
   At any rate, I'll run this system test to verify the changes soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest

2020-06-11 Thread GitBox


bbejeck commented on pull request #6229:
URL: https://github.com/apache/kafka/pull/6229#issuecomment-642774237


   Ok to test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-11 Thread GitBox


junrao commented on pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#issuecomment-642751664


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-11 Thread GitBox


junrao commented on pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#issuecomment-642750609


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches

2020-06-11 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10146:
--
Labels: backport  (was: )

> Backport KAFKA-9066 to 2.5 and 2.4 branches
> ---
>
> Key: KAFKA-10146
> URL: https://issues.apache.org/jira/browse/KAFKA-10146
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>  Labels: backport
> Fix For: 2.4.2, 2.5.2
>
>
> KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so 
> this was not backported at the time. However, once 2.5.1 is out the door, the 
> `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` 
> branches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active

2020-06-11 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Affects Version/s: (was: 2.7.0)

> Standby state isn't always re-used when transitioning to active
> ---
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> This ticket was initially just to write an integration test, but I escalated 
> it to a blocker and changed the title when the integration test actually 
> surfaced two bugs:
>  # Offset positions were not reported for in-memory stores, so tasks with 
> in-memory stores would never be considered as "caught up" and could not take 
> over active processing, preventing clusters from ever achieving balance. This 
> is a regression in 2.6
>  # When the TaskAssignor decided to switch active processing from a former 
> owner to a new one that had a standby, the lower-level cooperative rebalance 
> protocol would first de-schedule the task completely, and only later would 
> assign it to the new owner. For in-memory stores, this causes the standby 
> state not to be re-used, and for persistent stores, it creates a window in 
> which the cleanup thread might delete the state directory. In both cases, 
> even though the instance previously had a standby, once it gets the active, 
> it still had to restore the entire state from the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches

2020-06-11 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10146:
--
Issue Type: Task  (was: Bug)

> Backport KAFKA-9066 to 2.5 and 2.4 branches
> ---
>
> Key: KAFKA-10146
> URL: https://issues.apache.org/jira/browse/KAFKA-10146
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
> Fix For: 2.4.2, 2.5.2
>
>
> KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so 
> this was not backported at the time. However, once 2.5.1 is out the door, the 
> `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` 
> branches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active

2020-06-11 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Fix Version/s: (was: 2.7.0)

> Standby state isn't always re-used when transitioning to active
> ---
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> This ticket was initially just to write an integration test, but I escalated 
> it to a blocker and changed the title when the integration test actually 
> surfaced two bugs:
>  # Offset positions were not reported for in-memory stores, so tasks with 
> in-memory stores would never be considered as "caught up" and could not take 
> over active processing, preventing clusters from ever achieving balance. This 
> is a regression in 2.6
>  # When the TaskAssignor decided to switch active processing from a former 
> owner to a new one that had a standby, the lower-level cooperative rebalance 
> protocol would first de-schedule the task completely, and only later would 
> assign it to the new owner. For in-memory stores, this causes the standby 
> state not to be re-used, and for persistent stores, it creates a window in 
> which the cleanup thread might delete the state directory. In both cases, 
> even though the instance previously had a standby, once it gets the active, 
> it still had to restore the entire state from the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches

2020-06-11 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133340#comment-17133340
 ] 

Randall Hauch commented on KAFKA-10146:
---

Added PR: https://github.com/apache/kafka/pull/8854

> Backport KAFKA-9066 to 2.5 and 2.4 branches
> ---
>
> Key: KAFKA-10146
> URL: https://issues.apache.org/jira/browse/KAFKA-10146
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
> Fix For: 2.4.2, 2.5.2
>
>
> KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so 
> this was not backported at the time. However, once 2.5.1 is out the door, the 
> `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` 
> branches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches

2020-06-11 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17133343#comment-17133343
 ] 

Randall Hauch commented on KAFKA-10146:
---

Be sure to update KAFKA-9066 fix versions when this is merged.

> Backport KAFKA-9066 to 2.5 and 2.4 branches
> ---
>
> Key: KAFKA-10146
> URL: https://issues.apache.org/jira/browse/KAFKA-10146
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
> Fix For: 2.4.2, 2.5.2
>
>
> KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so 
> this was not backported at the time. However, once 2.5.1 is out the door, the 
> `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` 
> branches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

2020-06-11 Thread GitBox


mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438870452



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
 @Override
 public boolean hasPosition() {

Review comment:
   I thought about this too, but if i recall, the whole point of adding the 
state enum was to not rely on the instance variables to deduce the state. I 
think another solution here would be to have some kind of empty position monad 
to avoid the null in the first place. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch opened a new pull request #8854: KAFKA-10146, KAFKA-9066: Retain metrics for failed tasks (#8502)

2020-06-11 Thread GitBox


rhauch opened a new pull request #8854:
URL: https://github.com/apache/kafka/pull/8854


   Targets the `2.5` branch, and should be backported to the `2.4` branch.
   
   This backports the KAFKA-9066 / #8502 changes to retain metrics for failed 
tasks that was already merged to `trunk` and backported to the `2.6` branch. 
   
   This PR has one change relative to the original PR: it removes an 
integration test added in the `2.6` branch for KIP-158 and modified as part of 
KAFKA-9066 / #8502.
   
   Ping @C0urante (original author)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

2020-06-11 Thread GitBox


mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438870452



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
 @Override
 public boolean hasPosition() {

Review comment:
   I thought about this too, but if i recall, the whole point of adding the 
state enum was to not rely on the instance variables to deduce the state. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9374) Worker can be disabled by blocked connectors

2020-06-11 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9374.
--
  Reviewer: Konstantine Karantasis
Resolution: Fixed

Merged to `trunk` and backported to the `2.6` branch for inclusion in 2.6.0.

> Worker can be disabled by blocked connectors
> 
>
> Key: KAFKA-9374
> URL: https://issues.apache.org/jira/browse/KAFKA-9374
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0
>
>
> If a connector hangs during any of its {{initialize}}, {{start}}, {{stop}}, 
> \{taskConfigs}}, {{taskClass}}, {{version}}, {{config}}, or {{validate}} 
> methods, the worker will be disabled for some types of requests thereafter, 
> including connector creation, connector reconfiguration, and connector 
> deletion.
>  -This only occurs in distributed mode and is due to the threading model used 
> by the 
> [DistributedHerder|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java]
>  class.- This affects both distributed and standalone mode. Distributed 
> herders perform some connector work synchronously in their {{tick}} thread, 
> which also handles group membership and some REST requests. The majority of 
> the herder methods for the standalone herder are {{synchronized}}, including 
> those for creating, updating, and deleting connectors; as long as one of 
> those methods blocks, all subsequent calls to any of these methods will also 
> be blocked.
>  
> One potential solution could be to treat connectors that fail to start, stop, 
> etc. in time similarly to tasks that fail to stop within the [task graceful 
> shutdown timeout 
> period|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L121-L126]
>  by handling all connector interactions on a separate thread, waiting for 
> them to complete within a timeout, and abandoning the thread (and 
> transitioning the connector to the {{FAILED}} state, if it has been created 
> at all) if that timeout expires.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9374) Worker can be disabled by blocked connectors

2020-06-11 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9374:
-
Fix Version/s: 2.6.0

> Worker can be disabled by blocked connectors
> 
>
> Key: KAFKA-9374
> URL: https://issues.apache.org/jira/browse/KAFKA-9374
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0
>
>
> If a connector hangs during any of its {{initialize}}, {{start}}, {{stop}}, 
> \{taskConfigs}}, {{taskClass}}, {{version}}, {{config}}, or {{validate}} 
> methods, the worker will be disabled for some types of requests thereafter, 
> including connector creation, connector reconfiguration, and connector 
> deletion.
>  -This only occurs in distributed mode and is due to the threading model used 
> by the 
> [DistributedHerder|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java]
>  class.- This affects both distributed and standalone mode. Distributed 
> herders perform some connector work synchronously in their {{tick}} thread, 
> which also handles group membership and some REST requests. The majority of 
> the herder methods for the standalone herder are {{synchronized}}, including 
> those for creating, updating, and deleting connectors; as long as one of 
> those methods blocks, all subsequent calls to any of these methods will also 
> be blocked.
>  
> One potential solution could be to treat connectors that fail to start, stop, 
> etc. in time similarly to tasks that fail to stop within the [task graceful 
> shutdown timeout 
> period|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L121-L126]
>  by handling all connector interactions on a separate thread, waiting for 
> them to complete within a timeout, and abandoning the thread (and 
> transitioning the connector to the {{FAILED}} state, if it has been created 
> at all) if that timeout expires.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

2020-06-11 Thread GitBox


mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438867535



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -675,36 +676,41 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 completedFetch.partition);
 } else {
 FetchPosition position = 
subscriptions.position(completedFetch.partition);
-if (completedFetch.nextFetchOffset == position.offset) {
-List> partRecords = 
completedFetch.fetchRecords(maxRecords);
-
-log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
-partRecords.size(), position, 
completedFetch.partition);
-
-if (completedFetch.nextFetchOffset > position.offset) {
-FetchPosition nextPosition = new FetchPosition(
-completedFetch.nextFetchOffset,
-completedFetch.lastEpoch,
-position.currentLeader);
-log.trace("Update fetching position to {} for partition 
{}", nextPosition, completedFetch.partition);
-subscriptions.position(completedFetch.partition, 
nextPosition);
-}
+if (position != null) {

Review comment:
   I think we need the check in transition since we set the position in the 
runnable sometimes. E.g., 
   
   ```java
   private void transitionState(FetchState newState, Runnable 
runIfTransitioned) {
   FetchState nextState = this.fetchState.transitionTo(newState);
   if (nextState.equals(newState)) {
   this.fetchState = nextState;
   runIfTransitioned.run();
   if (this.position == null && nextState.hasPosition()) {
   throw new IllegalStateException("Transitioned 
subscription state to " + nextState + ", but position is null");
   }
   }
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9969) ConnectorClientConfigRequest is loaded in isolation and throws LinkageError

2020-06-11 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9969.
--
  Reviewer: Konstantine Karantasis
Resolution: Fixed

[~kkonstantine] merged to `trunk` and backported to:
* `2.6` for inclusion in upcoming 2.6.0
* `2.5` for inclusion in upcoming 2.5.1
* `2.4` for inclusion in a future 2.4.2
* `2.3` for inclusion in a future 2.3.2

> ConnectorClientConfigRequest is loaded in isolation and throws LinkageError
> ---
>
> Key: KAFKA-9969
> URL: https://issues.apache.org/jira/browse/KAFKA-9969
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> ConnectorClientConfigRequest (added by 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy])
>  is a class in connect-api, and should always be loaded by the system 
> classloader. If a plugin packages the connect-api jar, the REST API may fail 
> with the following stacktrace:
> {noformat}
> java.lang.LinkageError: loader constraint violation: loader (instance of 
> sun/misc/Launcher$AppClassLoader) previously initiated loading for a 
> different type with name 
> "org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest" at 
> java.lang.ClassLoader.defineClass1(Native Method) at 
> java.lang.ClassLoader.defineClass(ClassLoader.java:763) at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at 
> java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at 
> java.net.URLClassLoader.access$100(URLClassLoader.java:74) at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:369) at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:363) at 
> java.security.AccessController.doPrivileged(Native Method) at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:362) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:424) at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:357) at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:416)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 more
> {noformat}
> It appears that the other class in org.apache.kafka.connect.connector.policy, 
> ConnectorClientConfigOverridePolicy had a similar issue in KAFKA-8415, and 
> received a fix.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9969) ConnectorClientConfigRequest is loaded in isolation and throws LinkageError

2020-06-11 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9969:
-
Fix Version/s: 2.5.1
   2.4.2
   2.6.0
   2.3.2

> ConnectorClientConfigRequest is loaded in isolation and throws LinkageError
> ---
>
> Key: KAFKA-9969
> URL: https://issues.apache.org/jira/browse/KAFKA-9969
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> ConnectorClientConfigRequest (added by 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy])
>  is a class in connect-api, and should always be loaded by the system 
> classloader. If a plugin packages the connect-api jar, the REST API may fail 
> with the following stacktrace:
> {noformat}
> java.lang.LinkageError: loader constraint violation: loader (instance of 
> sun/misc/Launcher$AppClassLoader) previously initiated loading for a 
> different type with name 
> "org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest" at 
> java.lang.ClassLoader.defineClass1(Native Method) at 
> java.lang.ClassLoader.defineClass(ClassLoader.java:763) at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at 
> java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at 
> java.net.URLClassLoader.access$100(URLClassLoader.java:74) at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:369) at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:363) at 
> java.security.AccessController.doPrivileged(Native Method) at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:362) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:424) at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:357) at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:416)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 more
> {noformat}
> It appears that the other class in org.apache.kafka.connect.connector.policy, 
> ConnectorClientConfigOverridePolicy had a similar issue in KAFKA-8415, and 
> received a fix.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10148) Kafka Streams Restores too few Records with eos-beta Enabled

2020-06-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10148:
--
Description: 
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
headerKeys: [] key: 14920 payload: 9274
...
{code}

The output topic partition {{min-1}} contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code} 

The last record is obviously wrong because 1595 is less than 9215.

To test the resilience to an unexpected failure of a Streams client, the system 
tests aborts a Streams client, i.e., the client is closed in a dirty manner. 
This dirty close causes the Streams client to restore its local state store 
that maintains the minimum aggregate from the beginning of the changelog topic 
partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}. 
The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} 
contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code}

Also here the last record is wrong. 

During the restoration, the Streams client uses its Kafka consumer to issue a 
list offsets request to get the end offset of the changelog topic partition. 
The response to the list offsets request contains end offset 1518 for 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} as can be seen 
here:

{code}
[2020-06-09 08:11:49,250] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Received LIST_OFFSETS response from node 2 for request with 
header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-4=PartitionData(errorCode:
 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1=PartitionData(errorCode:
 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
(org.apache.kafka.clients.NetworkClient)
{code}

Offset 1518 is before record in 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}

{code}
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
{code}

Hence, this record is not restored into the local state store. However, after 
the restoration the input topic partition {{data-1}} is read starting with 
offset 2094. That means that record

{code}
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
{code} 

is not read there either because it has a lower offset. Instead the following 
record with with key 14920 and value 9274 is read, but since 9274 is not less 
than 9215, value 9215 is written a second time to the output topic.

I ran the system tests 10x with eos_alpha and 10x with eos_beta and only 
eos_beta failed a couple of times.

  was:
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9

[jira] [Updated] (KAFKA-10148) Kafka Streams Restores too few Records with eos-beta Enabled

2020-06-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10148:
--
Description: 
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
headerKeys: [] key: 14920 payload: 9274
...
{code}

The output topic partition {{min-1}} contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code} 

The last record is obviously wrong because 1595 is less than 9215.

To test the resilience to an unexpected failure of a Streams client, the system 
tests aborts a Streams client, i.e., the client is closed in a dirty manner. 
This dirty close causes the Streams client to restore its local state store 
that maintains the minimum aggregate from the beginning of the changelog topic 
partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}. 
The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} 
contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code}

Also here the last record is wrong. 

During the restoration, the Streams client uses its Kafka consumer to issue a 
list offsets request to get the end offset of the changelog topic partition. 
The response to the list offsets request contains end offset 1518 for 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} as can be seen 
here:

{code}
[2020-06-09 08:11:49,250] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Received LIST_OFFSETS response from node 2 for request with 
header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-4=PartitionData(errorCode:
 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1=PartitionData(errorCode:
 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
(org.apache.kafka.clients.NetworkClient)
{code}

Offset 1518 is before record in 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}

{code}
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
{code}

Hence, this record is not restored into the local state store. However, after 
the restoration the input topic partition {{data-1}} is read starting with 
offset 2094. That means that record

{code}
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
{code} 

is not read there either because it has a lower offset. Instead the following 
record with with key 14920 and value 9274 is read, but since 9274 is not less 
than 9215, value 9215 is written a second time to the output topic.

I run the system tests 10x with eos_alpha and 10x with eos_beta and only 
eos_beta failed a couple of times.

  was:
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9

[GitHub] [kafka] bellemare commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-11 Thread GitBox


bellemare commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-642720296


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >