[GitHub] [kafka] vvcephei merged pull request #8667: KAFKA-9994: Handle task migrated inside corruption path

2020-05-14 Thread GitBox


vvcephei merged pull request #8667:
URL: https://github.com/apache/kafka/pull/8667


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-05-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-1:
--
Labels: needs-kip  (was: )

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-05-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-1:
--
Description: 
It'd be nice to be able to configure source connectors such that their offsets 
are committed if and only if all records up to that point have been ack'd by 
the producer. This would go a long way towards EOS for source connectors.

 

This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
for source connectors and is not concerned with source connector offsets.

This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, which 
had a lot of discussion around allowing connector-defined transaction 
boundaries. The suggestion in this ticket is to only use source connector 
offset commits as the transaction boundaries for connectors; allowing 
connector-specified transaction boundaries can be addressed separately.

  was:It'd be nice to be able to configure source connectors such that their 
offsets are committed if and only if all records up to that point have been 
ack'd by the producer. This would go a long way towards EOS for source 
connectors.


> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-05-14 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-1:
-

 Summary: Atomic commit of source connector records and offsets
 Key: KAFKA-1
 URL: https://issues.apache.org/jira/browse/KAFKA-1
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Chris Egerton


It'd be nice to be able to configure source connectors such that their offsets 
are committed if and only if all records up to that point have been ack'd by 
the producer. This would go a long way towards EOS for source connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-14 Thread GitBox


ableegoldman commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425467074



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -198,433 +196,51 @@ public StateStore getStateStore(final String name) {
 
 @Override
 public void commit() {
-task.requestCommit();
+throwUnsupportedOperationExceptionIfStandby("commit");
+applyStreamTaskOperation(StreamTask::requestCommit);
 }
 
 @Override
 @Deprecated
 public Cancellable schedule(final long intervalMs,
 final PunctuationType type,
 final Punctuator callback) {
+throwUnsupportedOperationExceptionIfStandby("schedule");
 if (intervalMs < 1) {
 throw new IllegalArgumentException("The minimum supported 
scheduling interval is 1 millisecond.");
 }
-return task.schedule(intervalMs, type, callback);
+return returnStreamTaskOperation(t -> t.schedule(intervalMs, type, 
callback));
 }
 
 @SuppressWarnings("deprecation") // removing #schedule(final long 
intervalMs,...) will fix this
 @Override
 public Cancellable schedule(final Duration interval,
 final PunctuationType type,
 final Punctuator callback) throws 
IllegalArgumentException {
+throwUnsupportedOperationExceptionIfStandby("schedule");
 final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, 
"interval");
 return schedule(ApiUtils.validateMillisecondDuration(interval, 
msgPrefix), type, callback);
 }
 
-private abstract static class StateStoreReadOnlyDecorator

Review comment:
   I felt these were just cluttering up this class so I moved them to a new 
file





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-14 Thread GitBox


ableegoldman commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-628915649


   Call for review @cadonna @guozhangwang @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-14 Thread GitBox


ableegoldman opened a new pull request #8669:
URL: https://github.com/apache/kafka/pull/8669


   This is a prerequisite for 
[KAFKA-9501](https://github.com/apache/kafka/pull/8248/) and will also be 
useful for [KAFKA-9603](https://github.com/apache/kafka/pull/8661)
   
   There should be no logical changes here: the main difference is the removal 
of `StandbyContextImpl` in preparation for contexts to transition between 
active and standby. 
   
   Also includes some minor cleanup, eg pulling the ReadOnly/ReadWrite 
decorators out into a separate file.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9993) Think about inheritance in the protocol generation framework

2020-05-14 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107726#comment-17107726
 ] 

Boyang Chen commented on KAFKA-9993:


Thanks [~vvcephei] An example is from the recent implementation for Raft: 

[https://github.com/confluentinc/kafka/pull/319/files#r424848803] a couple of 
common fields just as error code, leader id and later on cluster.id are 
attached to all the generated RPCs like vote, fetch, begin/end quorum, etc, but 
you have to manually extract them one by one without a common ancestor. It 
would be good to consolidate them.

Similar cases for group coordinator and txn coordinator code are definitely 
possible to be simplified, as they separate share a lot of common fields for 
sure.

> Think about inheritance in the protocol generation framework
> 
>
> Key: KAFKA-9993
> URL: https://issues.apache.org/jira/browse/KAFKA-9993
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> We have seen that there are a lot of common fields inside the 
> request/response templates that could be extracted as a super class for auto 
> generated classes. For example most response contains a top level error code. 
> Currently to build a service receiving multiple RPCs, the code template 
> produces a lot of redundant error code extraction logic which is far from 
> ideal. 
> What we want to discuss is whether to enable the general inheritance 
> mechanism in this framework, what's the trade-off and complexity increase, 
> and if there is any workaround just to make less boiler templates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] OneCricketeer commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-05-14 Thread GitBox


OneCricketeer commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-628906219


   Since Java 8 is EOL, why not upgrade to openjdk 11?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics

2020-05-14 Thread GitBox


mumrah commented on a change in pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#discussion_r425443889



##
File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
##
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.kafka.common.utils.Time
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Retrieves Linux /proc/self/io metrics.
+ */
+class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: 
Logger) {
+  import LinuxIoMetricsCollector._
+  var lastUpdateMs = -1L
+  var cachedReadBytes = 0L
+  var cachedWriteBytes = 0L
+  val path = Paths.get(procRoot, "self", "io")
+
+  def readBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()

Review comment:
   minor nit: could move the time check to the updateValues method since 
that's where lastUpdateMs is set? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-14 Thread GitBox


rhauch commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-628898153


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc removed a comment on pull request #8528: System tests should use --bootstrap-server rather than --zookeeper when testing new Kafka versions

2020-05-14 Thread GitBox


d8tltanc removed a comment on pull request #8528:
URL: https://github.com/apache/kafka/pull/8528#issuecomment-62030


   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3931/
   Link for the system tests ⬆️



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8090: KAFKA-9537 - Cleanup error messages for abstract transformations

2020-05-14 Thread GitBox


rhauch commented on pull request #8090:
URL: https://github.com/apache/kafka/pull/8090#issuecomment-628895259


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty

2020-05-14 Thread GitBox


mjsax merged pull request #8665:
URL: https://github.com/apache/kafka/pull/8665


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty

2020-05-14 Thread GitBox


mjsax commented on pull request #8665:
URL: https://github.com/apache/kafka/pull/8665#issuecomment-628892193


   Thanks for the fix @zhaohaidao! Merged to `trunk`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9984) Should fail the subscription when pattern is empty

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9984.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Should fail the subscription when pattern is empty
> --
>
> Key: KAFKA-9984
> URL: https://issues.apache.org/jira/browse/KAFKA-9984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
> Fix For: 2.6.0
>
>
> We have seen a case where the consumer subscribes to an empty string pattern:
> ```
> [Consumer ...  ] Subscribed to pattern:  ''
> ```
> which doesn't make any sense and usually indicate a configuration error. The 
> `consumer.subscribe(pattern)` call should fail with illegal argument for this 
> case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-14 Thread GitBox


rhauch commented on pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#issuecomment-628885132


   I've added some integration tests for creating the internal topic, including 
verifying some existing functionality w/r/t the replication factor and number 
of partitions with various Kafka cluster sizes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator

2020-05-14 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107672#comment-17107672
 ] 

Boyang Chen commented on KAFKA-9676:


The current trunk has logic coverage for StandbyTaskCreator in TaskManagerTest 
and StreamThreadTest. I feel we don't necessarily need a dedicated test class 
for it.

> Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
> --
>
> Key: KAFKA-9676
> URL: https://issues.apache.org/jira/browse/KAFKA-9676
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, newbie
>
> The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit 
> test coverage. We should add corresponding tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


vvcephei commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628881057


   Thanks, @ableegoldman !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


vvcephei merged pull request #8662:
URL: https://github.com/apache/kafka/pull/8662


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


ableegoldman commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628879075


   Java 14 build passed, Java 8 din't build due to 
   ```
   11:29:13 # There is insufficient memory for the Java Runtime Environment to 
continue.
   11:29:13 # Cannot create GC thread. Out of system resources.
   ``` 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9999) Topic description should be triggered after each failed topic creation iteration

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-:
--

 Summary: Topic description should be triggered after each failed 
topic creation iteration 
 Key: KAFKA-
 URL: https://issues.apache.org/jira/browse/KAFKA-
 Project: Kafka
  Issue Type: Bug
  Components: admin, streams
Affects Versions: 2.4.0
Reporter: Boyang Chen


We spotted a case in system test failure where the topic already exists but the 
admin client still attempts to recreate it:

 
{code:java}
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably 
marked for deletion (number of partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-uwin-cnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager) 
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-cntByCnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Topics 
[SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, 
SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made 
ready with 5 retries left 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics 
after 5 retries. This can happen if the Kafka cluster is temporary not 
available. You can increase admin client config `retries` to be resilient 
against this error. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,221] ERROR stream-thread 
[SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the 
following unexpected Kafka exception during processing, this usually indicate 
Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: Could not create topics after 
5 retries. This can happen if the Kafka cluster is temporary not available. You 
can increase admin client config `retries` to be resilient against this error.
        at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
 
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
 
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
        at 

[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-14 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107623#comment-17107623
 ] 

Boyang Chen commented on KAFKA-9989:


Discussed with [~cadonna] and it seems to be caused by some transient bug in 
the 441 assignor. Now the more general question is, whether we should account 
the scenario of an empty assignment at all in this test? Agree this helps catch 
assignor bugs like this, but still is this the right place for such a coverage?

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-14 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9989:
--

Assignee: (was: Boyang Chen)

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely

2020-05-14 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-9998:
---

 Summary: KafkaProducer.close(timeout) still may block indefinitely
 Key: KAFKA-9998
 URL: https://issues.apache.org/jira/browse/KAFKA-9998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.1
Reporter: radai rosenblatt


looking at KafkaProducer.close(timeout), we have this:
{code:java}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
timeoutMs);

// this will keep track of the first encountered exception
AtomicReference firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeoutMs > 0) {
if (invokedFromCallback) {
log.warn("Overriding close timeout {} ms to 0 ms in order to 
prevent useless blocking due to self-join. " +
"This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.",
timeoutMs);
} else {
// Try to close gracefully.
if (this.sender != null)
this.sender.initiateClose();
if (this.ioThread != null) {
try {
this.ioThread.join(timeoutMs);< GRACEFUL JOIN
} catch (InterruptedException t) {
firstException.compareAndSet(null, new 
InterruptException(t));
log.error("Interrupted while joining ioThread", t);
}
}
}
}

if (this.sender != null && this.ioThread != null && 
this.ioThread.isAlive()) {
log.info("Proceeding to force close the producer since pending requests 
could not be completed " +
"within timeout {} ms.", timeoutMs);
this.sender.forceClose();
// Only join the sender thread when not calling from callback.
if (!invokedFromCallback) {
try {
this.ioThread.join();   <- UNBOUNDED JOIN
} catch (InterruptedException e) {
firstException.compareAndSet(null, new InterruptException(e));
}
}
}
...
}

{code}
specifically in our case the ioThread was running a (very) long running 
user-provided callback which was preventing the producer from closing within 
the given timeout.

 

I think the 2nd join() call should either be _VERY_ short (since we're already 
past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8667: KAFKA-9994: Handle task migrated inside corruption path

2020-05-14 Thread GitBox


vvcephei commented on a change in pull request #8667:
URL: https://github.com/apache/kafka/pull/8667#discussion_r425386296



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -555,28 +555,35 @@ void runLoop() {
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
  "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
-
-taskManager.commit(
-taskManager.tasks()
-.values()
-.stream()
-.filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
-.filter(t -> 
!e.corruptedTaskWithChangelogs().containsKey(t.id()))
-.collect(Collectors.toSet())
-);
-taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+try {
+taskManager.commit(
+taskManager.tasks()
+.values()
+.stream()
+.filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+.filter(t -> 
!e.corruptedTaskWithChangelogs().containsKey(t.id()))
+.collect(Collectors.toSet())
+);
+
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+} catch (final TaskMigratedException taskMigrated) {
+handleTaskMigrated(taskMigrated);

Review comment:
   Sounds legit. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-14 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107584#comment-17107584
 ] 

Hai Lin commented on KAFKA-9987:


Thanks for putting this up. From the above algorithm, looks like we will 
introduce a cap per of partition assigned per rebalance. Can you hight light a 
bit the difference from existing algorithm, the only difference is the new C_f 
and C_c? So the existing one will assign everything to fill up all 
unfilled_members and all partitions right?

Also what's the time frame looks like for the implementation? Will it come with 
some benchmark to evaluate the performance with large consumer group(> 2k)? 
Will it come to 2.4 as a minor release? The reason we want it to upgrade from 
2.2 to 2.4 is to get a better performance for large consumer group(> 2k), and 
this(stableness of large consumer group) is been a pain point for us for a 
while.

 

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-14 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107585#comment-17107585
 ] 

Hai Lin commented on KAFKA-9987:


Saw the PR, I will take a look.

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8667: KAFKA-9994: Handle task migrated inside corruption path

2020-05-14 Thread GitBox


abbccdda commented on a change in pull request #8667:
URL: https://github.com/apache/kafka/pull/8667#discussion_r425366150



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -555,28 +555,35 @@ void runLoop() {
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
  "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
-
-taskManager.commit(
-taskManager.tasks()
-.values()
-.stream()
-.filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
-.filter(t -> 
!e.corruptedTaskWithChangelogs().containsKey(t.id()))
-.collect(Collectors.toSet())
-);
-taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+try {
+taskManager.commit(
+taskManager.tasks()
+.values()
+.stream()
+.filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+.filter(t -> 
!e.corruptedTaskWithChangelogs().containsKey(t.id()))
+.collect(Collectors.toSet())
+);
+
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+} catch (final TaskMigratedException taskMigrated) {
+handleTaskMigrated(taskMigrated);

Review comment:
   Good point, I was also thinking whether we should do the corruption 
logic no matter what. But if we hit a TaskMigrated, the 
`taskManager.handleLostAll` will wipe out all the task states dirty, which 
seems like a super-set of jobs for handleCorruption. If we failed the commit, 
maybe we should just skip the corruption logic?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #8222: KAFKA-9650: include human readable units in ms and bytes configs

2020-05-14 Thread GitBox


mimaison merged pull request #8222:
URL: https://github.com/apache/kafka/pull/8222


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

2020-05-14 Thread GitBox


ableegoldman opened a new pull request #8668:
URL: https://github.com/apache/kafka/pull/8668


   Motivation and pseudo code algorithm in the ticket.
   
   [WIP] still need to finish writing tests and gather rough benchmark results



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9997) upgrade log4j lib to address CVE-2020-9488

2020-05-14 Thread Emanuele Maccherani (Jira)
Emanuele Maccherani created KAFKA-9997:
--

 Summary: upgrade log4j lib to address CVE-2020-9488
 Key: KAFKA-9997
 URL: https://issues.apache.org/jira/browse/KAFKA-9997
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 2.5.0
Reporter: Emanuele Maccherani


Kafka latest version is using log4j 1.2.17, which is affected by CVE-2020-9488. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8667: KAFKA-9994: Handle task migrated inside corruption path

2020-05-14 Thread GitBox


vvcephei commented on a change in pull request #8667:
URL: https://github.com/apache/kafka/pull/8667#discussion_r425348145



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -555,28 +555,35 @@ void runLoop() {
 } catch (final TaskCorruptedException e) {
 log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
  "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
-
-taskManager.commit(
-taskManager.tasks()
-.values()
-.stream()
-.filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
-.filter(t -> 
!e.corruptedTaskWithChangelogs().containsKey(t.id()))
-.collect(Collectors.toSet())
-);
-taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+try {
+taskManager.commit(
+taskManager.tasks()
+.values()
+.stream()
+.filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+.filter(t -> 
!e.corruptedTaskWithChangelogs().containsKey(t.id()))
+.collect(Collectors.toSet())
+);
+
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+} catch (final TaskMigratedException taskMigrated) {
+handleTaskMigrated(taskMigrated);

Review comment:
   Should we also handle the corrupted tasks here (before this line), so 
that they can be already cleaned up before the next round? Or, alternatively, 
should we move `taskManager.handleCorruption(e.corruptedTaskWithChangelogs());` 
to before the attempted commit (it looks like it could be outside the try block 
as well).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9996) upgrade zookeeper to 3.5.8 to address security vulnerabilities

2020-05-14 Thread Emanuele Maccherani (Jira)
Emanuele Maccherani created KAFKA-9996:
--

 Summary: upgrade zookeeper to 3.5.8 to address security 
vulnerabilities
 Key: KAFKA-9996
 URL: https://issues.apache.org/jira/browse/KAFKA-9996
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 2.5.0
Reporter: Emanuele Maccherani


Kafka is now using zookeeper 3.5.7, which is affected by CVE-2020-8840 and 
CVE-2020-9488. Those 2 are resolved in 3.5.8.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8558: KAFKA-8611 / KIP-221 documentation

2020-05-14 Thread GitBox


mjsax commented on pull request #8558:
URL: https://github.com/apache/kafka/pull/8558#issuecomment-628812878


   Yes, on the voting thread: I proposed to also deprecate `through()` via 
KIP-221 and Guozhang replied. But it's your KIP and I don't want to "hijack" it 
:) -- I am willing to do the follow up PR, if you agree on the change (I can 
also update the KIP itself). But won't do it without your consensus :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


mjsax commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-62881


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


mjsax commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628810936


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


guozhangwang commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r425343094



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -1520,38 +1534,6 @@ private static void validateActiveTaskEncoding(final 
List partit
 }
 }
 
-/**
- * Internal helper function that creates a Kafka topic
- *
- * @param topicPartitions Map that contains the topic names to be created 
with the number of partitions
- */
-private void prepareTopic(final Map 
topicPartitions) {
-log.debug("Starting to validate internal topics {} in partition 
assignor.", topicPartitions);
-
-// first construct the topics to make ready
-final Map topicsToMakeReady = new 
HashMap<>();
-
-for (final InternalTopicConfig topic : topicPartitions.values()) {
-final Optional numPartitions = topic.numberOfPartitions();
-if (!numPartitions.isPresent()) {
-throw new StreamsException(
-String.format("%sTopic [%s] number of partitions not 
defined",
-  logPrefix, topic.name())
-);
-}
-if (!topic.hasEnforcedNumberOfPartitions()) {
-topic.setNumberOfPartitions(numPartitions.get());
-}
-topicsToMakeReady.put(topic.name(), topic);
-}

Review comment:
   The logic indeed seem redundant to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator

2020-05-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107523#comment-17107523
 ] 

Matthias J. Sax commented on KAFKA-9676:


[~bchen225242] There is still no `StandbyTaskCreatorTest` – IMHO we should keep 
this ticket open.

> Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
> --
>
> Key: KAFKA-9676
> URL: https://issues.apache.org/jira/browse/KAFKA-9676
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, newbie
>
> The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit 
> test coverage. We should add corresponding tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


vvcephei commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r425330736



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -169,6 +173,9 @@ public void makeReady(final Map topics) {
 log.error(timeoutAndRetryError);
 throw new StreamsException(timeoutAndRetryError);
 }
+log.debug("Completed validating internal topics and created {}", 
newlyCreatedTopics);

Review comment:
   Thanks! Will do. I just wanted to bounce the idea off you first, in case 
it was stupid.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8667: KAFKA-9994: Handle task migrated inside corruption path

2020-05-14 Thread GitBox


mjsax commented on pull request #8667:
URL: https://github.com/apache/kafka/pull/8667#issuecomment-628795711


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #8667: KAFKA-9994: Handle task migrated inside corruption path

2020-05-14 Thread GitBox


abbccdda opened a new pull request #8667:
URL: https://github.com/apache/kafka/pull/8667


   The TaskMigratedException should always be non-fatal and caught within the 
run loop. Adding try-catch for corrupted path as well is necessary.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


ableegoldman commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r425314470



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -169,6 +173,9 @@ public void makeReady(final Map topics) {
 log.error(timeoutAndRetryError);
 throw new StreamsException(timeoutAndRetryError);
 }
+log.debug("Completed validating internal topics and created {}", 
newlyCreatedTopics);

Review comment:
   I do agree it would be useful though. Feel free to create a ticket :P 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-14 Thread GitBox


ableegoldman commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r425314263



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -169,6 +173,9 @@ public void makeReady(final Map topics) {
 log.error(timeoutAndRetryError);
 throw new StreamsException(timeoutAndRetryError);
 }
+log.debug("Completed validating internal topics and created {}", 
newlyCreatedTopics);

Review comment:
   I think this race condition was particularly severe since we do the 
listOffsets request pretty much immediately after creating the topics, whereas 
whatever we're doing with that topic next will not be until the rebalance was 
completed.
   
   AFAIK we've never had any users report subsequent operations failing after 
the first rebalance due to not-yet-fully-created topics, but it could have just 
slipped past us





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty

2020-05-14 Thread GitBox


mjsax commented on pull request #8665:
URL: https://github.com/apache/kafka/pull/8665#issuecomment-628779775


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9993) Think about inheritance in the protocol generation framework

2020-05-14 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107487#comment-17107487
 ] 

John Roesler commented on KAFKA-9993:
-

Thanks for the idea, Boyang. Can you clarify whether the problem is boilerplate 
code inside the auto generated class, or boilerplate code that we have to write 
when using the generated classes?

> Think about inheritance in the protocol generation framework
> 
>
> Key: KAFKA-9993
> URL: https://issues.apache.org/jira/browse/KAFKA-9993
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> We have seen that there are a lot of common fields inside the 
> request/response templates that could be extracted as a super class for auto 
> generated classes. For example most response contains a top level error code. 
> Currently to build a service receiving multiple RPCs, the code template 
> produces a lot of redundant error code extraction logic which is far from 
> ideal. 
> What we want to discuss is whether to enable the general inheritance 
> mechanism in this framework, what's the trade-off and complexity increase, 
> and if there is any workaround just to make less boiler templates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9995) IllegalStateException when offsets not found

2020-05-14 Thread James Hay (Jira)
James Hay created KAFKA-9995:


 Summary: IllegalStateException when offsets not found
 Key: KAFKA-9995
 URL: https://issues.apache.org/jira/browse/KAFKA-9995
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.1
Reporter: James Hay


 

I have a recently upgraded Kafka Streams 2.4.1 application and we have started 
seeing the application periodically crash due to the following error:
{code:java}
2020-05-14T16:53:03.839Z DEBUG <> 
[chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] 
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2-consumer, 
groupId=chat] Fetching committed offsets for partitions: 
[private.chat.endpoint-0, public.chat.message-0] 2020-05-14T16:53:03.841Z INFO  
<> [chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] 
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2-consumer, 
groupId=chat] Found no committed offset for partition private.chat.endpoint-0 
2020-05-14T16:53:03.842Z ERROR <> 
[chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] 
o.a.k.s.p.internals.StreamThread - stream-thread 
[chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] Encountered the 
following error during processing: java.lang.IllegalStateException: Offset 
limit should monotonically increase, but was reduced. New limit: 0. Previous 
limit: 857859at 
org.apache.kafka.streams.processor.internals.StandbyTask.updateOffsetLimits(StandbyTask.java:215)
at 
org.apache.kafka.streams.processor.internals.StandbyTask.update(StandbyTask.java:181)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1048)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:825)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
{code}
Is this a known issue? What can cause offsets to not be found?

Other things to note:
 * We have TOPOLOGY_OPTIMIZATION set to OPTIMIZE
 * We are only seeing this on two of our three environments (at the moment). 
The two environments we are seeing a problem have little traffic and only seems 
to impact the machines that are on the whole idle. Our prod environment which 
consumes regular events is showing no signs of having the same problem.
 * There is some evidence to suggest there is a pattern to the timing of this 
error. Although not always the case, 24hrs between errors is common.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path

2020-05-14 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9994:
---
Description: 
We have seen a case where the TaskMigrated exception gets thrown from 
taskManager.commit(). This should be prevented by proper catching.

Looking at the stack trace, the TaskMigrated was thrown from preCommit() call 
inside corrupted task exception commit.
{code:java}
[2020-05-14T05:47:25-07:00] 
(streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 
12:47:25,635] ERROR 
[stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
stream-thread 
[stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-05-14T05:47:25-07:00] 
(streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) 
org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced 
trying to send a record [stream-thread 
[stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task 
[1_1]]; it means all tasks belonging to this thread should be migrated.
        at 
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
{code}

  was:We have seen a case where the TaskMigrated exception gets thrown from 
taskManager.commit(). This should be prevented by proper catching.


> Catch TaskMigrated exception in task corruption code path 
> --
>
> Key: KAFKA-9994
> URL: https://issues.apache.org/jira/browse/KAFKA-9994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> We have seen a case where the TaskMigrated exception gets thrown from 
> taskManager.commit(). This should be prevented by proper catching.
> Looking at the stack trace, the TaskMigrated was thrown from preCommit() call 
> inside corrupted task exception commit.
> {code:java}
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 
> 12:47:25,635] ERROR 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> stream-thread 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) 
> 

[jira] [Created] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9994:
--

 Summary: Catch TaskMigrated exception in task corruption code path 
 Key: KAFKA-9994
 URL: https://issues.apache.org/jira/browse/KAFKA-9994
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Boyang Chen


We have seen a case where the TaskMigrated exception gets thrown from 
taskManager.commit(). This should be prevented by proper catching.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9993) Think about inheritance in the protocol generation framework

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9993:
--

 Summary: Think about inheritance in the protocol generation 
framework
 Key: KAFKA-9993
 URL: https://issues.apache.org/jira/browse/KAFKA-9993
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


We have seen that there are a lot of common fields inside the request/response 
templates that could be extracted as a super class for auto generated classes. 
For example most response contains a top level error code. Currently to build a 
service receiving multiple RPCs, the code template produces a lot of redundant 
error code extraction logic which is far from ideal. 

What we want to discuss is whether to enable the general inheritance mechanism 
in this framework, what's the trade-off and complexity increase, and if there 
is any workaround just to make less boiler templates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8222: KAFKA-9650: include human readable units in ms and bytes configs

2020-05-14 Thread GitBox


mimaison commented on pull request #8222:
URL: https://github.com/apache/kafka/pull/8222#issuecomment-628745671


   Tests passed locally and the generated docs looked good, merging to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffhuang26 commented on pull request #7898: KAFKA-9366: please consider upgrade log4j to log4j2 due to critical security problem CVE-2019-17571

2020-05-14 Thread GitBox


jeffhuang26 commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-628732206


   What is timeline for merging this PR? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8222: KAFKA-9650: include human readable units in ms and bytes configs

2020-05-14 Thread GitBox


mimaison commented on pull request #8222:
URL: https://github.com/apache/kafka/pull/8222#issuecomment-628718604


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9992) EmbeddedKafkaCluster not working with kafka_2.13

2020-05-14 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107394#comment-17107394
 ] 

John Roesler commented on KAFKA-9992:
-

Hello Andras,

Just to make sure I understand, is this just a problem with our test artifact? 
The test artifacts are not public APIs, and I would not recommend depending on 
them.

If you want to submit a PR for your patch we can merge it, but I don’t think we 
can call this a bug. 

Note that there have been a few conversations in Jira and on the mailing list 
about actually creating a public EmbeddedKafkaCluster for testing. So far, no 
one has picked it up, though. If you’re interested in doing that, it would be 
appreciated, and I can help with the KIP process. 

My standard advice is to copy/paste the EmbeddedKafkaCluster into your own test 
module so that you don’t depend on upstream tests. 

> EmbeddedKafkaCluster not working with kafka_2.13
> 
>
> Key: KAFKA-9992
> URL: https://issues.apache.org/jira/browse/KAFKA-9992
> Project: Kafka
>  Issue Type: Bug
>  Components: packaging, streams
>Affects Versions: 2.4.1
>Reporter: Andras Katona
>Priority: Major
>
> Kafka Streams artifact is depending on kafka_2.12 as of now, it is in the 
> [kafka-streams-2.4.1.pom|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/2.4.1/kafka-streams-2.4.1.pom]:
> {code}
> 
>   org.apache.kafka
>   kafka_2.12
>   2.4.1
>   test
>   
> {code}
> But it is not hardcoded, whatever scala version was used to compile this 
> component before uploading, that will be present in the pom.
> When I'm using these deps:
> {code}
> 
>   org.apache.kafka
>   kafka-streams
>   2.4.1
>   test
>   test
> 
> 
>   org.apache.kafka
>   kafka_2.13
>   2.4.1
>   test
>   test
> 
> {code}
> My test fails with the following exception (deleteTopicAndWait is called in 
> my @After method):
> {noformat}
> java.lang.NoSuchMethodError: 
> scala.collection.JavaConverters.setAsJavaSetConverter(Lscala/collection/Set;)Lscala/collection/convert/Decorators$AsJava;
> at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster$TopicsDeletedCondition.conditionMet(EmbeddedKafkaCluster.java:316)
> at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:370)
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356)
> at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:266)
> at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicAndWait(EmbeddedKafkaCluster.java:221)
> {noformat}
> I modified kafka build locally to separate artifacts based on scala version 
> just like it is done with kafka core, and I pulled in kafka-streams_2.13 from 
> my local mvn repo and test was working again.
> I was only trying with 2.4.1, but I'm assuming other versions are also 
> affected, please add the proper versions and proper components too (in case 
> it's not packaging).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9479) Describe consumer group --all-groups shows header for each entry

2020-05-14 Thread Vince Mu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107374#comment-17107374
 ] 

Vince Mu commented on KAFKA-9479:
-

PR created [https://github.com/apache/kafka/pull/8666] 

> Describe consumer group --all-groups shows header for each entry
> 
>
> Key: KAFKA-9479
> URL: https://issues.apache.org/jira/browse/KAFKA-9479
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jeff Kim
>Priority: Major
>  Labels: newbie
>
> When using `bin/kafka-consumer-groups.sh --describe --state --all-groups`, we 
> print output like the following:
> {code}
> GROUP  COORDINATOR (ID)  
> ASSIGNMENT-STRATEGY  STATE   #MEMBERS
> group1 localhost:9092 (3) rangeStable  1  
>   
>
> GROUP  COORDINATOR (ID)  
> ASSIGNMENT-STRATEGY  STATE   #MEMBERS
> group2  localhost:9092 (3) rangeStable  1 
>   
>  
> {code}
> It would be nice if we did not show the header for every entry.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] VinceMu opened a new pull request #8666: KAFKA-9479 Describe consumer group --state --all-groups show header once

2020-05-14 Thread GitBox


VinceMu opened a new pull request #8666:
URL: https://github.com/apache/kafka/pull/8666


   Used the  [previous PR ](https://github.com/apache/kafka/pull/8096) made by 
vetler as a starting point.
   
   Updated the  printState() method in ConsumerGroup command to only print the 
header once when the following options are set `--describe --state 
--all-groups`.
   
   Modified testDescribeAllExistingGroups test in DescribeConsumerGroupTest so 
that we take into account the case where we only print the header once. In this 
case number of lines is equal to the length of DescribeTypes + 1. 
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-14 Thread GitBox


rhauch commented on pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#issuecomment-628682601


   BTW, I've improved the error message when Connect's `TopicAdmin` fails to 
create a topic because of an unknown topic setting:
   
   > Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:297)
   org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 
'mm2-offsets.backup.internal': Unknown topic config name: 
topic.replication.factor
   
   
   Here's what this looks like in the log message just before the herder exits:
   ```
   [2020-05-14 09:36:22,348] ERROR [Worker clientId=connect-2, 
groupId=backup-mm2] Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:297)
   org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 
'mm2-offsets.backup.internal': Unknown topic config name: 
topic.replication.factor
at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:305)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:105)
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:115)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:186)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:123)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:284)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: 
Unknown topic config name: topic.replication.factor
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest

2020-05-14 Thread GitBox


bbejeck commented on pull request #6229:
URL: https://github.com/apache/kafka/pull/6229#issuecomment-628670456


   @sh-abhi apologies for letting this slip.  Can you rebase this PR? Then we 
can get this merged.
   
   Thanks. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-14 Thread GitBox


vvcephei merged pull request #8588:
URL: https://github.com/apache/kafka/pull/8588


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-14 Thread GitBox


vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-628635918


   The test failure was unrelated: 
kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mmolimar commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ

2020-05-14 Thread GitBox


mmolimar commented on a change in pull request #8663:
URL: https://github.com/apache/kafka/pull/8663#discussion_r425135341



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##
@@ -96,6 +101,25 @@ public static void validate(Map props) {
 throw new ConfigException("Must configure one of " +
 SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
 }
+
+if (hasDlqTopicConfig) {
+String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
+if (hasTopicsConfig) {
+List topics = parseTopicsList(props);
+if (topics.contains(dlqTopic)) {
+throw new ConfigException(DLQ_TOPIC_NAME_CONFIG + " has a 
topic name which is already in " +

Review comment:
   There will be just one topic in the DQL topic config. We could add it 
but I'm not sure if it's explicitly necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9992) EmbeddedKafkaCluster not working with kafka_2.13

2020-05-14 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona updated KAFKA-9992:
-
Summary: EmbeddedKafkaCluster not working with kafka_2.13  (was: 
EmbeddedKafka not working with kafka_2.13)

> EmbeddedKafkaCluster not working with kafka_2.13
> 
>
> Key: KAFKA-9992
> URL: https://issues.apache.org/jira/browse/KAFKA-9992
> Project: Kafka
>  Issue Type: Bug
>  Components: packaging, streams
>Affects Versions: 2.4.1
>Reporter: Andras Katona
>Priority: Major
>
> Kafka Streams artifact is depending on kafka_2.12 as of now, it is in the 
> [kafka-streams-2.4.1.pom|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/2.4.1/kafka-streams-2.4.1.pom]:
> {code}
> 
>   org.apache.kafka
>   kafka_2.12
>   2.4.1
>   test
>   
> {code}
> But it is not hardcoded, whatever scala version was used to compile this 
> component before uploading, that will be present in the pom.
> When I'm using these deps:
> {code}
> 
>   org.apache.kafka
>   kafka-streams
>   2.4.1
>   test
>   test
> 
> 
>   org.apache.kafka
>   kafka_2.13
>   2.4.1
>   test
>   test
> 
> {code}
> My test fails with the following exception (deleteTopicAndWait is called in 
> my @After method):
> {noformat}
> java.lang.NoSuchMethodError: 
> scala.collection.JavaConverters.setAsJavaSetConverter(Lscala/collection/Set;)Lscala/collection/convert/Decorators$AsJava;
> at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster$TopicsDeletedCondition.conditionMet(EmbeddedKafkaCluster.java:316)
> at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:370)
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356)
> at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:266)
> at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicAndWait(EmbeddedKafkaCluster.java:221)
> {noformat}
> I modified kafka build locally to separate artifacts based on scala version 
> just like it is done with kafka core, and I pulled in kafka-streams_2.13 from 
> my local mvn repo and test was working again.
> I was only trying with 2.4.1, but I'm assuming other versions are also 
> affected, please add the proper versions and proper components too (in case 
> it's not packaging).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9992) EmbeddedKafka not working with kafka_2.13

2020-05-14 Thread Andras Katona (Jira)
Andras Katona created KAFKA-9992:


 Summary: EmbeddedKafka not working with kafka_2.13
 Key: KAFKA-9992
 URL: https://issues.apache.org/jira/browse/KAFKA-9992
 Project: Kafka
  Issue Type: Bug
  Components: packaging, streams
Affects Versions: 2.4.1
Reporter: Andras Katona


Kafka Streams artifact is depending on kafka_2.12 as of now, it is in the 
[kafka-streams-2.4.1.pom|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/2.4.1/kafka-streams-2.4.1.pom]:
{code}

  org.apache.kafka
  kafka_2.12
  2.4.1
  test
  
{code}
But it is not hardcoded, whatever scala version was used to compile this 
component before uploading, that will be present in the pom.

When I'm using these deps:
{code}

  org.apache.kafka
  kafka-streams
  2.4.1
  test
  test



  org.apache.kafka
  kafka_2.13
  2.4.1
  test
  test

{code}

My test fails with the following exception (deleteTopicAndWait is called in my 
@After method):
{noformat}
java.lang.NoSuchMethodError: 
scala.collection.JavaConverters.setAsJavaSetConverter(Lscala/collection/Set;)Lscala/collection/convert/Decorators$AsJava;
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster$TopicsDeletedCondition.conditionMet(EmbeddedKafkaCluster.java:316)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:370)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:266)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicAndWait(EmbeddedKafkaCluster.java:221)
{noformat}

I modified kafka build locally to separate artifacts based on scala version 
just like it is done with kafka core, and I pulled in kafka-streams_2.13 from 
my local mvn repo and test was working again.

I was only trying with 2.4.1, but I'm assuming other versions are also 
affected, please add the proper versions and proper components too (in case 
it's not packaging).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mmolimar commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ

2020-05-14 Thread GitBox


mmolimar commented on a change in pull request #8663:
URL: https://github.com/apache/kafka/pull/8663#discussion_r425115475



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##
@@ -108,6 +132,20 @@ public static boolean hasTopicsRegexConfig(Map props) {
 return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
 }
 
+public static boolean hasDlqTopicConfig(Map props) {
+String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG);
+return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty();
+}
+
+public static List parseTopicsList(Map props) {
+List topics = (List) 
ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST);
+return topics
+.stream()
+.filter(topic -> !topic.isEmpty())
+.distinct()

Review comment:
   That what I thought but we would also have a topic name with an empty 
string.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on a change in pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty

2020-05-14 Thread GitBox


zhaohaidao commented on a change in pull request #8665:
URL: https://github.com/apache/kafka/pull/8665#discussion_r425097747



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1015,8 +1015,8 @@ public void subscribe(Collection topics) {
 @Override
 public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
 maybeThrowInvalidGroupIdException();
-if (pattern == null)
-throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be null");
+if (pattern == null || pattern.toString().equals(""))
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + pattern == null ? "null" : "empty");

Review comment:
   Thanks. I have fixed this issue. Could you continue to review it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8528: System tests should use --bootstrap-server rather than --zookeeper when testing new Kafka versions

2020-05-14 Thread GitBox


d8tltanc edited a comment on pull request #8528:
URL: https://github.com/apache/kafka/pull/8528#issuecomment-628340161


   Latest dev branch builds here (REMOVE_SYSTEM_4): 
   
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3944/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon edited a comment on pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#issuecomment-628563253


   Hi  @bbejeck , I appended more fixs for the streams documents. Please review 
it when available. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon edited a comment on pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#issuecomment-628563253


   Hi  @bbejeck , I appended more fixs for the streams documents when I read 
it. Please review it when available. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon commented on pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#issuecomment-628563253


   Hi  @bbejeck , I appended more fixs for the streams documents. Please review 
it when available. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r425055023



##
File path: docs/streams/developer-guide/write-streams.html
##
@@ -208,7 +208,7 @@ Using Kafka Streams within your application code
-  Testing a Streams application
+  Testing a Streams application

Review comment:
   wrong HTML formatting. The starting/ending `` tag and `` tag are 
mixed places.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r425055442



##
File path: docs/streams/developer-guide/running-app.html
##
@@ -51,7 +51,7 @@
   Running 
Streams Applications
   You can run Java applications that use the Kafka Streams library 
without any additional configuration or requirements. Kafka Streams
   also provides the ability to receive notification of the various 
states of the application. The ability to monitor the runtime
-  status is discussed in the 
monitoring guide.
+  status is discussed in the 
monitoring guide.

Review comment:
   Fix the broken link. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r425055023



##
File path: docs/streams/developer-guide/write-streams.html
##
@@ -208,7 +208,7 @@ Using Kafka Streams within your application code
-  Testing a Streams application
+  Testing a Streams application

Review comment:
   wrong HTML formatting. `` tag and `` tag are mixed places.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r425054711



##
File path: docs/streams/developer-guide/write-streams.html
##
@@ -154,7 +154,7 @@ Using Kafka Streams within your application codeStream Partitions and Tasks and Threading Model.
+For more information, see Stream Partitions and Tasks and Threading Model.

Review comment:
   fix the broken link from wrong 
`href="../architecture.html#streams-architecture-threads"` to 
`href="../architecture.html#streams_architecture_threads"` (the one with 
underscore)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-14 Thread GitBox


showuon commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r425054297



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3738,7 +3738,7 @@ KTable-KTable 
Foreign-Key
 
 
 
-Testing a Streams application
+Testing a Streams application

Review comment:
   Currently, the `TESTING A STREAMS APPLICATION` header is in blue, which 
is different from others. Turns out that it's because we put `` tag inside 
``, and cause to apply the wrong css format.
   
![image](https://user-images.githubusercontent.com/43372967/81927117-b1d18780-9615-11ea-979b-db48c294e742.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty

2020-05-14 Thread GitBox


chia7712 commented on a change in pull request #8665:
URL: https://github.com/apache/kafka/pull/8665#discussion_r425001861



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1015,8 +1015,8 @@ public void subscribe(Collection topics) {
 @Override
 public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
 maybeThrowInvalidGroupIdException();
-if (pattern == null)
-throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be null");
+if (pattern == null || pattern.toString().equals(""))
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + pattern == null ? "null" : "empty");

Review comment:
   the condition is always false if you don't add brackets.
   
   ```
   throw new IllegalArgumentException("Topic pattern to subscribe to cannot be 
" + (pattern == null ? "null" : "empty"));
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-05-14 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-7965.

Resolution: Fixed

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: David Jacot
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-05-14 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107107#comment-17107107
 ] 

David Jacot commented on KAFKA-7965:


Closing it. Please re-open it if needed.

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: David Jacot
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-05-14 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-7965:
---
Fix Version/s: (was: 2.3.0)
   2.6.0

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: David Jacot
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


cadonna commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424984599



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   > Don't we still want to put the value in the store (even if we don't 
forward it on to the next context) if the values are the same but the timestamp 
is newer?
   
   If we just put the value in the store but did not forward it, then the store 
would actually be corrupted, because the local state would not be consistent 
with downstream anymore.
   
   Not putting a record with the same value but a newer timestamp in the store 
and not forwarding it was the main point of this KIP.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


cadonna commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424984599



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   > Don't we still want to put the value in the store (even if we don't 
forward it on to the next context) if the values are the same but the timestamp 
is newer?
   
   If we just put the value in the store but did not forward it, then the store 
would actually be corrupted, because the local state would not be consistent 
with downstream anymore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


cadonna commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424971223



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   > Why when the timestamp of the newer value is lower, do we want to put 
the new value into the store? Surely the store should have the value with the 
newer timestamp? Otherwise we could wind up with a corrupt store.
   
   This behavior was there also before this PR. If a out-of-order record is 
encountered, a log message was written, but the record was nevertheless put 
into the state store (cf. 
https://github.com/apache/kafka/blob/7624e6247984223901aa34d7b7c2789c3e1d0c3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L122).
 The only thing that changed is that if the serialized value of the new record 
is equal to the serialized value of the old value and the timestamp of the new 
record is equal or newer, we drop the record because it is a idempotent update.
   Could you elaborate on why a store should get corrupted because of this? 
   
   > would result in the table containing K: X, V: B, which is wrong.
   
   As said above, this behavior should not have been changed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


cadonna commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424971223



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   > Why when the timestamp of the newer value is lower, do we want to put 
the new value into the store? Surely the store should have the value with the 
newer timestamp? Otherwise we could wind up with a corrupt store.
   
   This behavior was there also before this PR. If a out-of-order record is 
encountered, a log message was written, but the record was nevertheless put 
into the state store (cf. 
https://github.com/apache/kafka/blob/7624e6247984223901aa34d7b7c2789c3e1d0c3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L122).
 The only thing that changed is that if the serialized value of the new record 
is equal to the serialized value of the old value and the timestamp of the new 
record is equal or newer, we drop the record because it is a idempotent update.
   Could you elaborate on why a store should get corrupted because of this? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


rodesai commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   @ConcurrencyPractitioner @vvcephei I'm trying to understand this to 
debug some broken tests in ksql. Couple questions:
   
   When the timestamp of the newer value is lower (ignoring the value), why do 
we want to put the new value into the store? Surely the store should have the 
value with the newer timestamp? Otherwise we could wind up with a corrupt store.
   
   Don't we still want to put the value in the store (even if we don't forward 
it on to the next context) if the values are the same but the timestamp is 
newer? Otherwise if we get an out-of-order update with a different value, but a 
timestamp in between the rows with the same value, we'd incorrectly put that 
value into the store, e.g. the following updates:
   
   TS: 1, K: X, V: A
   TS: 3, K: X, V: A
   TS: 2, K: X, V: B
   
   would result in the table containing `K: X, V: B`, which is wrong.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


rodesai commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   @ConcurrencyPractitioner @vvcephei I'm trying to understand this to 
debug some broken tests in ksql. Couple questions:
   
   Why when the timestamp of the newer value is lower, do we want to put the 
new value into the store? Surely the store should have the value with the newer 
timestamp? Otherwise we could wind up with a corrupt store.
   
   Don't we still want to put the value in the store if the values are the same 
but the timestamp is newer? Otherwise if we get an out-of-order update with a 
different value, but a timestamp in between the rows with the same value, we'd 
incorrectly put that value into the store, e.g. the following updates:
   
   TS: 1, K: X, V: A
   TS: 3, K: X, V: A
   TS: 2, K: X, V: B
   
   would result in the table containing `K: X, V: B`, which is wrong.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #8558: KAFKA-8611 / KIP-221 documentation

2020-05-14 Thread GitBox


lkokhreidze commented on pull request #8558:
URL: https://github.com/apache/kafka/pull/8558#issuecomment-628419029


   Hi @mjsax 
   
   I've rebased the branch.
   Do you mean comment by Guozhang in the voting thread? If not, I missed it 
and can't find anything new in the DISCUSS thread. Can you point me where was 
it asked?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator

2020-05-14 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9676.

Resolution: Fixed

The current unit test coverage is pretty good now, closing the ticket.

> Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
> --
>
> Key: KAFKA-9676
> URL: https://issues.apache.org/jira/browse/KAFKA-9676
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, newbie
>
> The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit 
> test coverage. We should add corresponding tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-14 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106950#comment-17106950
 ] 

Boyang Chen commented on KAFKA-9989:


I didn't find that in the log [~ableegoldman]

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8395: Added doc for KIP-535 and updated it for KIP-562

2020-05-14 Thread GitBox


abbccdda commented on a change in pull request #8395:
URL: https://github.com/apache/kafka/pull/8395#discussion_r404455662



##
File path: docs/upgrade.html
##
@@ -39,7 +39,8 @@ Notable changes in 2
 https://github.com/apache/kafka/tree/2.5/examples;>examples folder. 
Check out
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics;>KIP-447
 for the full details.
-Deprecated KafkaStreams.store(String, QueryableStoreType) 
and replaced it with KafkaStreams.store(StoreQueryParameters).
+Provided support to query stale stores(for high availability) and the 
stores belonging to a specific partition by deprecating 
KafkaStreams.store(String, QueryableStoreType) and replacing it 
with KafkaStreams.store(StoreQueryParameters).

Review comment:
   
![image](https://user-images.githubusercontent.com/5845561/78615480-809bb580-7826-11ea-9f59-2e7c3cf1a901.png)
   Let's add a space between `stores(` as `stores (`, and add a period after 
`KafkaStreams.allLocalStorePartitionLags()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




<    1   2