[jira] [Resolved] (KAFKA-9080) System Test Failure: MessageFormatChangeTest.testCompatibilty
[ https://issues.apache.org/jira/browse/KAFKA-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-9080. -- Resolution: Fixed > System Test Failure: MessageFormatChangeTest.testCompatibilty > - > > Key: KAFKA-9080 > URL: https://issues.apache.org/jira/browse/KAFKA-9080 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Manikumar >Assignee: Tu Tran >Priority: Blocker > Fix For: 2.4.0 > > > MessageFormatChangeTest tests are failing on 2.4 and trunk for 0.9.0.1 > version. > http://confluent-kafka-2-4-system-test-results.s3-us-west-2.amazonaws.com/2019-10-21--001.1571716233--confluentinc--2.4--cb4944f/report.html > {code} > Module: kafkatest.tests.client.message_format_change_test > Class: MessageFormatChangeTest > Method: test_compatibility > Arguments: > { > "consumer_version": "0.9.0.1", > "producer_version": "0.9.0.1" > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9080) System Test Failure: MessageFormatChangeTest.testCompatibilty
[ https://issues.apache.org/jira/browse/KAFKA-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965169#comment-16965169 ] ASF GitHub Bot commented on KAFKA-9080: --- omkreddy commented on pull request #7628: KAFKA-9080: Addresses MessageFormatChangeTest.testCompatibilty with version 0.9.0.1 URL: https://github.com/apache/kafka/pull/7628 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > System Test Failure: MessageFormatChangeTest.testCompatibilty > - > > Key: KAFKA-9080 > URL: https://issues.apache.org/jira/browse/KAFKA-9080 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Manikumar >Assignee: Tu Tran >Priority: Blocker > Fix For: 2.4.0 > > > MessageFormatChangeTest tests are failing on 2.4 and trunk for 0.9.0.1 > version. > http://confluent-kafka-2-4-system-test-results.s3-us-west-2.amazonaws.com/2019-10-21--001.1571716233--confluentinc--2.4--cb4944f/report.html > {code} > Module: kafkatest.tests.client.message_format_change_test > Class: MessageFormatChangeTest > Method: test_compatibility > Arguments: > { > "consumer_version": "0.9.0.1", > "producer_version": "0.9.0.1" > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state
[ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-8972. -- Resolution: Fixed > KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback > state > > > Key: KAFKA-8972 > URL: https://issues.apache.org/jira/browse/KAFKA-8972 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Fix For: 2.4.0 > > > Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the > following: > {code} > this.subscriptions.unsubscribe(); > this.coordinator.onLeavePrepare(); > this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); > {code} > And inside {{onLeavePrepare}} we would look into the assignment and try to > revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, > and then clear the assignment. > However, the subscription's assignment is already cleared in > {{this.subscriptions.unsubscribe();}} which means user's rebalance listener > would never be triggered. In other words, from consumer client's pov nothing > is owned after unsubscribe, but from the user caller's pov the partitions are > not revoked yet. For callers like Kafka Streams which rely on the rebalance > listener to maintain their internal state, this leads to inconsistent state > management and failure cases. > Before KIP-429 this issue is hidden away since every time the consumer > re-joins the group later, it would still revoke everything anyways regardless > of the passed-in parameters of the rebalance listener; with KIP-429 this is > easier to reproduce now. > I think we can summarize our fix as: > • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then > `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks > are all closed as revoked by then. > • [Optimization] If the generation is reset due to fatal error from join / hb > response etc, then we know that all partitions are lost, and we should not > trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside > `onLeavePrepare`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state
[ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965144#comment-16965144 ] ASF GitHub Bot commented on KAFKA-8972: --- guozhangwang commented on pull request #7620: KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after rebalance URL: https://github.com/apache/kafka/pull/7620 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback > state > > > Key: KAFKA-8972 > URL: https://issues.apache.org/jira/browse/KAFKA-8972 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Fix For: 2.4.0 > > > Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the > following: > {code} > this.subscriptions.unsubscribe(); > this.coordinator.onLeavePrepare(); > this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); > {code} > And inside {{onLeavePrepare}} we would look into the assignment and try to > revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, > and then clear the assignment. > However, the subscription's assignment is already cleared in > {{this.subscriptions.unsubscribe();}} which means user's rebalance listener > would never be triggered. In other words, from consumer client's pov nothing > is owned after unsubscribe, but from the user caller's pov the partitions are > not revoked yet. For callers like Kafka Streams which rely on the rebalance > listener to maintain their internal state, this leads to inconsistent state > management and failure cases. > Before KIP-429 this issue is hidden away since every time the consumer > re-joins the group later, it would still revoke everything anyways regardless > of the passed-in parameters of the rebalance listener; with KIP-429 this is > easier to reproduce now. > I think we can summarize our fix as: > • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then > `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks > are all closed as revoked by then. > • [Optimization] If the generation is reset due to fatal error from join / hb > response etc, then we know that all partitions are lost, and we should not > trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside > `onLeavePrepare`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9132) Refactor StreamThread to take advantage of new ConsumerRebalanceListener exception handling
Sophie Blee-Goldman created KAFKA-9132: -- Summary: Refactor StreamThread to take advantage of new ConsumerRebalanceListener exception handling Key: KAFKA-9132 URL: https://issues.apache.org/jira/browse/KAFKA-9132 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.4.0 Reporter: Sophie Blee-Goldman As part of KIP-429 we solved the long-standing issue where exceptions thrown during the ConsumerRebalanceListener's callbacks were swallowed, and changed the behavior so that these exceptions are now bubbled all the way up to the Consumer#poll call. Because of the original behavior, any exceptions thrown during task creation, suspension, closure, etc. had to be caught by the rebalance listener and passed on to the calling StreamThread by setting a "rebalanceException" field. This then has to be checked after every polling loop. We should refactor this in light of the new & fixed behavior, so that we can simply catch rebalance exceptions thrown from poll rather than check for them explicitly after every call. This has the additional benefit of being able to react to it immediately (whereas currently we have to go through the remainder of the entire `StreamThread#runOnce` loop before we notice the exception. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9099) Reassignments should be retried after unexpected errors
[ https://issues.apache.org/jira/browse/KAFKA-9099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski reassigned KAFKA-9099: -- Assignee: (was: Stanislav Kozlovski) > Reassignments should be retried after unexpected errors > --- > > Key: KAFKA-9099 > URL: https://issues.apache.org/jira/browse/KAFKA-9099 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > Currently we do not catch unexpected exceptions in `onPartitionReassignment` > in the controller. Generally this can lead to state inconsistencies and a > failure to complete a reassignment. We should add exception handling and come > up with a retry approach. For example, we could track failed reassignments in > the controller context and retry them periodically. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9131) failed producer metadata updates result in the unrelated error message
Ask Zuckerberg created KAFKA-9131: - Summary: failed producer metadata updates result in the unrelated error message Key: KAFKA-9131 URL: https://issues.apache.org/jira/browse/KAFKA-9131 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0 Reporter: Ask Zuckerberg {{Producer Metadata TimeoutException}} is processed as a generic RetriableException in RecordCollectorImpl.sendError. This results in an irrelevant error message. We were supposed to see this "Timeout exception caught when sending record to topic %s. " + "This might happen if the producer cannot send data to the Kafka cluster and thus, " + "its internal buffer fills up. " + "This can also happen if the broker is slow to respond, if the network connection to " + "the broker was interrupted, or if similar circumstances arise. " + "You can increase producer parameter `max.block.ms` to increase this timeout." but got this: "You can increase the producer configs `delivery.timeout.ms` and/or " + "`retries` to avoid this error. Note that `retries` is set to infinite by default." These params are not applicable to metadata updates. Technical details: (1) Lines 221 - 236 in kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java are dead code. They are never executed because {{producer.send}} never throws TimeoutException, but returns a failed future. You can see it in lines 948-955 in kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (2) The exception is then processed in a callback function in the method {{recordSendError}} on line 202. The DefaultProductionExceptionHandler is used. (3) in {{recordSendError}} in the same class the timeout exception is processed as RetriableException at lines 133-136. The error message is simply wrong because tweaking {{[delivery.timeout.ms|http://delivery.timeout.ms/]}} and {{retries}} has nothing to do with the issue in this case. Proposed solution: (1) Remove unreachable catch (final TimeoutException e) in RecordCollectorImpl.java as Producer does not throw ApiExceptions. (2) Move the aforementioned catch clause to recordSendError method. (3) Process TimeoutException separately from RetiriableException. (4) Implement a unit test to cover this corner case -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8868) Consider auto-generating Streams binary protocol messages
[ https://issues.apache.org/jira/browse/KAFKA-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964986#comment-16964986 ] ASF GitHub Bot commented on KAFKA-8868: --- guozhangwang commented on pull request #7248: KAFKA-8868: Generate SubscriptionInfo protocol message URL: https://github.com/apache/kafka/pull/7248 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consider auto-generating Streams binary protocol messages > - > > Key: KAFKA-8868 > URL: https://issues.apache.org/jira/browse/KAFKA-8868 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > > Rather than maintain hand coded protocol serialization code, Streams could > use the same code-generation framework as Clients/Core. > There isn't a perfect match, since the code generation framework includes an > assumption that you're generating "protocol messages", rather than just > arbitrary blobs, but I think it's close enough to justify using it, and > improving it over time. > Using the code generation allows us to drop a lot of detail-oriented, > brittle, and hard-to-maintain serialization logic in favor of a schema spec. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8250) Flaky Test DelegationTokenEndToEndAuthorizationTest#testProduceConsumeViaAssign
[ https://issues.apache.org/jira/browse/KAFKA-8250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964984#comment-16964984 ] Matthias J. Sax commented on KAFKA-8250: https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9099/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/ {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 1 records at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1343) at kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:529) at kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:368){quote} > Flaky Test > DelegationTokenEndToEndAuthorizationTest#testProduceConsumeViaAssign > --- > > Key: KAFKA-8250 > URL: https://issues.apache.org/jira/browse/KAFKA-8250 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > Fix For: 2.5.0 > > > [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/442/tests] > {quote}java.lang.AssertionError: Consumed more records than expected > expected:<1> but was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1288) > at > kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:460) > at > kafka.api.EndToEndAuthorizationTest.testProduceConsumeViaAssign(EndToEndAuthorizationTest.scala:209){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964982#comment-16964982 ] Matthias J. Sax commented on KAFKA-8677: https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9099/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/ > Flakey test > GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > > > Key: KAFKA-8677 > URL: https://issues.apache.org/jira/browse/KAFKA-8677 > Project: Kafka > Issue Type: Bug > Components: core, security, unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Anastasia Vela >Priority: Blocker > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console] > > *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* > kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* > *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records > --- > I found this flaky test is actually exposing a real bug in consumer: within > {{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch > request before returning the data in order to pipelining the fetch requests: > {code} > if (!records.isEmpty()) { > // before returning the fetched records, we can send off > the next round of fetches > // and avoid block waiting for their responses to enable > pipelining while the user > // is handling the fetched records. > // > // NOTE: since the consumed position has already been > updated, we must not allow > // wakeups or any other errors to be triggered prior to > returning the fetched records. > if (fetcher.sendFetches() > 0 || > client.hasPendingRequests()) { > client.pollNoWakeup(); > } > return this.interceptors.onConsume(new > ConsumerRecords<>(records)); > } > {code} > As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions, > since at this point the fetch position has been updated. If an exception is > thrown here, and the callers decides to capture and continue, those records > would never be returned again, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9128) ConsumerCoordinator ignores exceptions from ConsumerRebalanceListener
[ https://issues.apache.org/jira/browse/KAFKA-9128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964975#comment-16964975 ] Guozhang Wang commented on KAFKA-9128: -- This seems a duplicate of https://issues.apache.org/jira/browse/KAFKA-4600, which is already resolved in 2.4.0 > ConsumerCoordinator ignores exceptions from ConsumerRebalanceListener > - > > Key: KAFKA-9128 > URL: https://issues.apache.org/jira/browse/KAFKA-9128 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0 >Reporter: Oleg Muravskiy >Priority: Critical > > I'm using a custom ConsumerRebalanceListener with a plain old Kafka consumer > to manage offsets in an external storage as described in > ConsumerRebalanceListener's javadoc. > When that storage is not available, I'm throwing an exception from my > listener. However, the exception is simply logged and ignored by the > ConsumerCoordinator, as could be seen in these two code snippets from it: > {code:java} > try { > listener.onPartitionsRevoked(revoked); > } catch (WakeupException | InterruptException e) { > throw e; > } catch (Exception e) { > log.error("User provided listener {} failed on partition > revocation", listener.getClass().getName(), e); > }{code} > and > {code:java} > try { > listener.onPartitionsAssigned(assignedPartitions); > } catch (WakeupException | InterruptException e) { > throw e; > } catch (Exception e) { > log.error("User provided listener {} failed on partition > assignment", listener.getClass().getName(), e); > }{code} > The consumption continues as if nothing has happened. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9130) Allow listing consumer groups per state
Mickael Maison created KAFKA-9130: - Summary: Allow listing consumer groups per state Key: KAFKA-9130 URL: https://issues.apache.org/jira/browse/KAFKA-9130 Project: Kafka Issue Type: Improvement Reporter: Mickael Maison Assignee: Mickael Maison Ticket for KIP-518: https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9129) Add Thread ID to the InternalProcessorContext
Bruno Cadonna created KAFKA-9129: Summary: Add Thread ID to the InternalProcessorContext Key: KAFKA-9129 URL: https://issues.apache.org/jira/browse/KAFKA-9129 Project: Kafka Issue Type: Improvement Components: streams Reporter: Bruno Cadonna When we added client metrics we had to move the {{StreamsMetricsImpl}} object to the client level. That means that now instead of having one {{StreamsMetricsImpl}} object per thread, we have now one per client. That also means that we cannot store the thread ID in the {{StreamsMetricsImpl}} anymore. Currently, we get the thread ID from {{Thread.currentThread().getName()}} when we need to create a sensor. However, that is not robust against code refactoring because we need to ensure that the thread that creates the sensor is also the one that records the metrics. To be more flexible, we should expose the ID of the thread that executes a processor in the {{InternalProcessorContext}} like it already exposes the task ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8953) Consider renaming `UsePreviousTimeOnInvalidTimestamp` timestamp extractor
[ https://issues.apache.org/jira/browse/KAFKA-8953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964812#comment-16964812 ] ASF GitHub Bot commented on KAFKA-8953: --- ravowlga123 commented on pull request #7633: KAFKA-8953 Consider renaming UsePreviousTimeOnInvalidTimestamp timestamp extractor URL: https://github.com/apache/kafka/pull/7633 *Added deprecate annotation in UsePreviousTimeOnInvalidTimestamp and created a new class UsePartitionTimeOnInvalidTimestamp. Also replaced usage of UsePreviousTimeOnInvalidTimestam with new class name, don't know if this was necessary but can be reverted back if necessary* *Also renamed the test class of UsePreviousTimeOnInvalidTimestamp with new class name* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consider renaming `UsePreviousTimeOnInvalidTimestamp` timestamp extractor > - > > Key: KAFKA-8953 > URL: https://issues.apache.org/jira/browse/KAFKA-8953 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Rabi Kumar K C >Priority: Trivial > Labels: beginner, kip, newbie > > Kafka Streams ships couple of different timestamp extractors, one named > `UsePreviousTimeOnInvalidTimestamp`. > Given the latest improvements with regard to time tracking, it seems > appropriate to rename this class to `UsePartitionTimeOnInvalidTimestamp`, as > we know have fixed definition of partition time, and also pass in partition > time into the `#extract(...)` method, instead of some non-well-defined > "previous timestamp". > KIP-530: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-6542) Tables should trigger joins too, not just streams
[ https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964779#comment-16964779 ] Chad Preisler edited comment on KAFKA-6542 at 11/1/19 11:50 AM: >KTables are not really bootstrapped – compare KAFKA-4113 for a detailed >discussion. I've read KAFKA-4113, but we've had missed joins on start up because the kTable was still building. That issue referenced this on as addressing the "gaps".-- > Correct, but this describes only the failure case scenario. > a new application will have an empty KTable state when it's started. It seems reasonable to expect that if a record exists on the right side of the join condition at application startup that the join should succeed regardless of the state of the "current" kTable. If we can't rely on that assumption, then we can't rely on kStream to kTable joins in general. When running applications in a containerized environment (k8s), I would say it is more normal to have to rebuild a kTable from scratch than it is to have an existing kTable on startup. We've had joins fail on startup with the following scenario. - Exactly once semantics turned on. - Pod crashes. - Existing kTable state is gone. - Application restarts - Last failed record is sent - Join fails because kTable is still building and right side of join does not exist yet. In this situation the right side record existed on a topic before the left side record was created on the topic. I'll assume that there is a scenario where having this situation is okay, but there should at least be an option to "bootstrap" the kTable on startup. I know global kTable does bootstrap, but kTable is more appealing for large topics because we can run more instances and reduce startup time. was (Author: cpreisler): >KTables are not really bootstrapped – compare KAFKA-4113 for a detailed >discussion. I've read KAFKA-4113, but we've had missed joins on start up because the kTable was still building. That issue referenced this on as addressing the "gaps".-- > Correct, but this describes only the failure case scenario. > a new application will have an empty KTable state when it's started. It seems reasonable to expect that if a record exists on the right side of the join condition at application startup that the join should succeed regardless of the state of the "current" kTable. If we can't rely on that assumption, then we can't rely on kStream to kTable joins in general. When running applications in a containerized environment (k8s), I would say it is more normal to have to rebuild a kTable from scratch than it is to have an existing kTable on startup. We've had joins fail on startup with the following scenario. - Exactly once semantics turned on. - Pod crashes. - Existing kTable state is gone. - Application restarts - Last failed record is sent - Join fails because kTable is still building and right side of join does not exist yet. In this situation the right side of the record existed before the left side record was created. I'll assume that there is a scenario where having this situation is okay, but there should at least be an option to "bootstrap" the kTable on startup. I know global kTable does bootstrap, but kTable is more appealing for large topics because we can run more instances and reduce startup time. > Tables should trigger joins too, not just streams > - > > Key: KAFKA-6542 > URL: https://issues.apache.org/jira/browse/KAFKA-6542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > At the moment it's quite possible to have a race condition when joining a > stream with a table, if the stream event arrives first, before the table > event, in which case the join will fail. > This is also related to bootstrapping KTables (which is what a GKTable does). > Related to: KAFKA-4113 Allow KTable bootstrap -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8723) flaky test LeaderElectionCommandTest#testAllTopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-8723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964782#comment-16964782 ] Stanislav Kozlovski commented on KAFKA-8723: It is likely that https://issues.apache.org/jira/browse/KAFKA-8962 fixed this issue. I will punt on my PR for now > flaky test LeaderElectionCommandTest#testAllTopicPartition > -- > > Key: KAFKA-8723 > URL: https://issues.apache.org/jira/browse/KAFKA-8723 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23737/console] > > *15:52:26* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > STARTED*15:53:08* kafka.admin.LeaderElectionCommandTest.testAllTopicPartition > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11@2/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testAllTopicPartition.test.stdout*15:53:08* > *15:53:08* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > FAILED*15:53:08* kafka.common.AdminCommandFailedException: Timeout > waiting for election results*15:53:08* at > kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:91)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:74)*15:53:08* > at kafka.utils.TestUtils$.resource(TestUtils.scala:1588)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest.testAllTopicPartition(LeaderElectionCommandTest.scala:74)*15:53:08* > *15:53:08* Caused by:*15:53:08* > org.apache.kafka.common.errors.TimeoutException: Aborted due to > timeout.*15:53:08* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8723) flaky test LeaderElectionCommandTest#testAllTopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-8723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964781#comment-16964781 ] ASF GitHub Bot commented on KAFKA-8723: --- stanislavkozlovski commented on pull request #7314: KAFKA-8723: Increase timeout on LeaderElectionCommandTest URL: https://github.com/apache/kafka/pull/7314 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > flaky test LeaderElectionCommandTest#testAllTopicPartition > -- > > Key: KAFKA-8723 > URL: https://issues.apache.org/jira/browse/KAFKA-8723 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23737/console] > > *15:52:26* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > STARTED*15:53:08* kafka.admin.LeaderElectionCommandTest.testAllTopicPartition > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11@2/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testAllTopicPartition.test.stdout*15:53:08* > *15:53:08* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > FAILED*15:53:08* kafka.common.AdminCommandFailedException: Timeout > waiting for election results*15:53:08* at > kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:91)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:74)*15:53:08* > at kafka.utils.TestUtils$.resource(TestUtils.scala:1588)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest.testAllTopicPartition(LeaderElectionCommandTest.scala:74)*15:53:08* > *15:53:08* Caused by:*15:53:08* > org.apache.kafka.common.errors.TimeoutException: Aborted due to > timeout.*15:53:08* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6542) Tables should trigger joins too, not just streams
[ https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964779#comment-16964779 ] Chad Preisler commented on KAFKA-6542: -- >KTables are not really bootstrapped – compare KAFKA-4113 for a detailed >discussion. I've read KAFKA-4113, but we've had missed joins on start up because the kTable was still building. That issue referenced this on as addressing the "gaps".-- > Correct, but this describes only the failure case scenario. > a new application will have an empty KTable state when it's started. It seems reasonable to expect that if a record exists on the right side of the join condition at application startup that the join should succeed regardless of the state of the "current" kTable. If we can't rely on that assumption, then we can't rely on kStream to kTable joins in general. When running applications in a containerized environment (k8s), I would say it is more normal to have to rebuild a kTable from scratch than it is to have an existing kTable on startup. We've had joins fail on startup with the following scenario. - Exactly once semantics turned on. - Pod crashes. - Existing kTable state is gone. - Application restarts - Last failed record is sent - Join fails because kTable is still building and right side of join does not exist yet. In this situation the right side of the record existed before the left side record was created. I'll assume that there is a scenario where having this situation is okay, but there should at least be an option to "bootstrap" the kTable on startup. I know global kTable does bootstrap, but kTable is more appealing for large topics because we can run more instances and reduce startup time. > Tables should trigger joins too, not just streams > - > > Key: KAFKA-6542 > URL: https://issues.apache.org/jira/browse/KAFKA-6542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > At the moment it's quite possible to have a race condition when joining a > stream with a table, if the stream event arrives first, before the table > event, in which case the join will fail. > This is also related to bootstrapping KTables (which is what a GKTable does). > Related to: KAFKA-4113 Allow KTable bootstrap -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8977) Remove MockStreamsMetrics Since it is not a Mock
[ https://issues.apache.org/jira/browse/KAFKA-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964717#comment-16964717 ] Bruno Cadonna commented on KAFKA-8977: -- I agree with you that mocks are often just used to mock direct dependencies and verify their usage. However, mocks can also be used to just return values for the methods of the mocked object without any verification. That is also how `MockStreamsMetrics` is used at the moment, except that it uses production code to compute the return values. I would like to have mock that just returns constants. Maybe for some methods, we will need to provide some functionality to set the returned values during test setup. With this I would like to achieve a better isolation of unit tests. That is, I do not want that a unit test that uses `StreamsMetricsImpl` fails because there is a bug in `StreamsMetricsImpl`. Only the tests in `StreamsMetricsImplTest` should fail in that case. The goal is to reduce the maintenance costs of unit tests. If you just want to remove {{MockStreamsMetrics}} with {{StreamsMetricsImpl}}, that is fine with me. IMO, this is already a step in the right direction. > Remove MockStreamsMetrics Since it is not a Mock > > > Key: KAFKA-8977 > URL: https://issues.apache.org/jira/browse/KAFKA-8977 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: bibin sebastian >Priority: Minor > Labels: newbie > > The class {{MockStreamsMetrics}} is used throughout unit tests as a mock but > it is not really a mock since it only hides two parameters of the > {{StreamsMetricsImpl}} constructor. Either a real mock or the real > {{StreamsMetricsImpl}} should be used in the tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8977) Remove MockStreamsMetrics Since it is not a Mock
[ https://issues.apache.org/jira/browse/KAFKA-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964717#comment-16964717 ] Bruno Cadonna edited comment on KAFKA-8977 at 11/1/19 8:39 AM: --- I agree with you that mocks are often just used to mock direct dependencies and verify their usage. However, mocks can also be used to just return values for the methods of the mocked object without any verification. That is also how {{MockStreamsMetrics}} is used at the moment, except that it uses production code to compute the return values. I would like to have mock that just returns constants. Maybe for some methods, we will need to provide some functionality to set the returned values during test setup. With this I would like to achieve a better isolation of unit tests. That is, I do not want that a unit test that uses {{StreamsMetricsImpl}} fails because there is a bug in {{StreamsMetricsImpl}}. Only the tests in {{StreamsMetricsImplTest}} should fail in that case. The goal is to reduce the maintenance costs of unit tests. If you just want to remove {{MockStreamsMetrics}} with {{StreamsMetricsImpl}}, that is fine with me. IMO, this is already a step in the right direction. was (Author: cadonna): I agree with you that mocks are often just used to mock direct dependencies and verify their usage. However, mocks can also be used to just return values for the methods of the mocked object without any verification. That is also how `MockStreamsMetrics` is used at the moment, except that it uses production code to compute the return values. I would like to have mock that just returns constants. Maybe for some methods, we will need to provide some functionality to set the returned values during test setup. With this I would like to achieve a better isolation of unit tests. That is, I do not want that a unit test that uses `StreamsMetricsImpl` fails because there is a bug in `StreamsMetricsImpl`. Only the tests in `StreamsMetricsImplTest` should fail in that case. The goal is to reduce the maintenance costs of unit tests. If you just want to remove {{MockStreamsMetrics}} with {{StreamsMetricsImpl}}, that is fine with me. IMO, this is already a step in the right direction. > Remove MockStreamsMetrics Since it is not a Mock > > > Key: KAFKA-8977 > URL: https://issues.apache.org/jira/browse/KAFKA-8977 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: bibin sebastian >Priority: Minor > Labels: newbie > > The class {{MockStreamsMetrics}} is used throughout unit tests as a mock but > it is not really a mock since it only hides two parameters of the > {{StreamsMetricsImpl}} constructor. Either a real mock or the real > {{StreamsMetricsImpl}} should be used in the tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9128) ConsumerCoordinator ignores exceptions from ConsumerRebalanceListener
Oleg Muravskiy created KAFKA-9128: - Summary: ConsumerCoordinator ignores exceptions from ConsumerRebalanceListener Key: KAFKA-9128 URL: https://issues.apache.org/jira/browse/KAFKA-9128 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.3.0 Reporter: Oleg Muravskiy I'm using a custom ConsumerRebalanceListener with a plain old Kafka consumer to manage offsets in an external storage as described in ConsumerRebalanceListener's javadoc. When that storage is not available, I'm throwing an exception from my listener. However, the exception is simply logged and ignored by the ConsumerCoordinator, as could be seen in these two code snippets from it: {code:java} try { listener.onPartitionsRevoked(revoked); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e); }{code} and {code:java} try { listener.onPartitionsAssigned(assignedPartitions); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e); }{code} The consumption continues as if nothing has happened. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8977) Remove MockStreamsMetrics Since it is not a Mock
[ https://issues.apache.org/jira/browse/KAFKA-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964669#comment-16964669 ] bibin sebastian commented on KAFKA-8977: I am assuming you are suggesting to create a generic mock of `StreamsMetricsImpl` using the PowerMock/EasyMock and externalise for all tests. Technically I think that is possible. (Actually I think that is the only way to create mock for `StreamsMetricsImpl`. Creating the mock by implementing the interface `StreamsMetrics` won't really work because in most of the places `StreamsMetricsImpl` is directly referred instead of using the interface references. Also this class have multiple static methods. So I have a feeling that this class is used like a util class.) So going by your suggestion, even if I create a generic mock, I am still not convinced on the value of creating a mock in these scenarios where we have indirect access to `StreamsMetricsImpl`. Let me try to explain my thought process here. Generally I use a mock instance to mock any external (direct) dependencies of the class/method in test and I verify the usage of mock as part of assertion in the end. In our case since StreamsMetricsImpl is not being directly referred (most of the time) in classes/methods in test, even if we create a mock we won't be realistically be able to verify the usage (methods invocations) on the mock (using verify(mock) method). This is because we wouldn't want our tests to fail assertion if the indirect usage of StreamsMetricsImpl changes at a later point of time, so verifying the mock usage in the assertion wont really make sense. So my point is what is the real value we gain if we can't verify the usage of mock? So I was preferring to leave its usage as it is in its full form rather than using a mock. What do think? [~cadonna] > Remove MockStreamsMetrics Since it is not a Mock > > > Key: KAFKA-8977 > URL: https://issues.apache.org/jira/browse/KAFKA-8977 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: bibin sebastian >Priority: Minor > Labels: newbie > > The class {{MockStreamsMetrics}} is used throughout unit tests as a mock but > it is not really a mock since it only hides two parameters of the > {{StreamsMetricsImpl}} constructor. Either a real mock or the real > {{StreamsMetricsImpl}} should be used in the tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)