[jira] [Created] (KAFKA-10478) advertised.listeners should allow duplicated ports
Andre Araujo created KAFKA-10478: Summary: advertised.listeners should allow duplicated ports Key: KAFKA-10478 URL: https://issues.apache.org/jira/browse/KAFKA-10478 Project: Kafka Issue Type: Improvement Components: core Reporter: Andre Araujo The same [validations|https://github.com/apache/kafka/blob/391ad90112fb2e9a85bf76250d57863bbf33b383/core/src/main/scala/kafka/utils/CoreUtils.scala#L259-L260] performed for {{listeners}} endpoints are also applied to [{{advertised.listeners}}|https://github.com/apache/kafka/blob/e8b2dcdee6f25e9344d52b84e86328ec616bf819/core/src/main/scala/kafka/server/KafkaConfig.scala#L1689-L1691]. It makes sense that neither parameter should allow duplicated listener names. The port number restriction is different though. It makes sense that we only allow one listener per port, since two listeners cannot bind to the same port at the same time (considering a single network interface). For advertised listeners, though this doesn't apply since Kafka doesn't actually bind to the advertised listener ports. A practical application of relaxing this restriction for {{advertised.listeners}} is the following: When configuring Kafka using Kerberos authentication and a Load Balancer we need to have two SASL_SSL listeners: (A) one running with the {{kafka/hostname}} principal and (B) another using {{kafka/lb_name}}, which is necessary for proper authentication when using the LB FQDN. After bootstrap, though, the client receives the brokers' addresses with the actual host FQDNs advertised by the brokers. To connect to the brokerd using the hostnames the client must connect to the listener A to be able to authenticate successfully with Kerberos. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector
chia7712 commented on pull request #9271: URL: https://github.com/apache/kafka/pull/9271#issuecomment-691397979 ``` 失敗 Build / JDK 8 / org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries ``` it is already fixed by https://github.com/apache/kafka/commit/e4eab377e1489fc0cc90edec3eeb382b1192a442 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
guozhangwang commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-691389142 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194590#comment-17194590 ] Guozhang Wang commented on KAFKA-6127: -- I think we can close this ticket now, since: * All blocking client calls should be covered with KIP-572 now. * Exceptional cases which would make producer / consumer to not be able to proceed should be covered with other created tickets now. > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir
showuon commented on pull request #9178: URL: https://github.com/apache/kafka/pull/9178#issuecomment-691365862 @junrao , No, the failing tests are not related to my change. **tests/Build/JDK 11**: PASS **tests/Build/JDK 15**: DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota failed, which is not related to my change **tests/Build/JDK 11**: KafkaConsumerTest.testReturnRecordsDuringRebalance failed, which is not related to my change Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691360304 @mjsax that sounds like it will work for the majority of the use cases I've come across. One thing thats not clear to me is how kstream operations that are down stream of a process would handle headers. For instance: ``` from("in") .transform(ForwardsFromTransformAndPunctuate::new) .transform(ModifyHeadersInTransform::new) // Should this type check the context before accessing headers? .setHeaders() // will this exception when processing driven by punctuate reaches here? .to("out") ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
ableegoldman commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663 Alright this should finally be ready for a final pass and merge @vvcephei @cadonna -- sorry for leaving this hanging for so long. After running a few different tests, it seems like the HATA assignor may actually be faster than the old StickyTaskAssignor in most cases. The HATA seems to scale slightly worse with increasing number of clients, but significantly better with partition count and num standbys The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 standby) took the HATA 20s for the full test, but only ~1-2s for the STA and FPTA. The `testManyStandbys` with 100 clients, 1000 partitions, and 50 standbys took the HATA roughly 10s, and the STA/FPTA just under a full minute. The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on the first assignment alone (taking just over 1.5 minutes) Decided to reduce the number of partitions in the `testLargePartitionCount` test to 3,000 rather than increasing the timeout for all tests, as it's already pretty high. Maybe we can drop the STA sooner or later and then tighten it up This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934 Also, looks like there were checkstyle issues. Probably you just need to add the license header to the new file This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on a change in pull request #9280: URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194565#comment-17194565 ] Sophie Blee-Goldman edited comment on KAFKA-6127 at 9/11/20, 11:47 PM: --- [~guozhang] [~vvcephei] any thoughts on this? You probably touched on a lot of these calls back in "The Refactor" – I remember seeing that we started to catch TimeoutException in the Consumer#position call, for example. I know there are some related issues with infinite blocking in exceptional cases, for example when a topic is deleted out from under a Producer, but we already have a separate ticket for that. If that sort of thing is the only one remaining we should maybe close this as a duplicate instead was (Author: ableegoldman): [~guozhang] any thoughts on this? You probably touched on a lot of these calls back in "The Refactor" – I remember seeing that we started to catch TimeoutException in the Consumer#position call, for example. I know there are some related issues with infinite blocking in exceptional cases, for example when a topic is deleted out from under a Producer, but we already have a separate ticket for that. If that sort of thing is the only one remaining we should maybe close this as a duplicate instead > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194565#comment-17194565 ] Sophie Blee-Goldman commented on KAFKA-6127: [~guozhang] any thoughts on this? You probably touched on a lot of these calls back in "The Refactor" – I remember seeing that we started to catch TimeoutException in the Consumer#position call, for example. I know there are some related issues with infinite blocking in exceptional cases, for example when a topic is deleted out from under a Producer, but we already have a separate ticket for that. If that sort of thing is the only one remaining we should maybe close this as a duplicate instead > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools
mjsax commented on a change in pull request #9255: URL: https://github.com/apache/kafka/pull/9255#discussion_r487321249 ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly() { assertEquals(msg, exception.get().getMessage()); assertEquals(1, count.get()); } + +@Test +public void shouldAcceptValidDateFormats() throws ParseException { +//check valid formats +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX")); +} + +@Test +public void shouldThrowOnInvalidDateFormat() { +//check some invalid formats +try { +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss")); +fail("Call to getDateTime should fail"); +} catch (final Exception e) { +e.printStackTrace(); Review comment: We should not print the stacktrace imho, but verify the exception message using `assertThat`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) { } return map; } + +/** + * Convert an ISO8601 based timestamp to an epoch value Review comment: nit: I don't know from the top of my head what the standard dictates. Might be worth adding to the comment? ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly() { assertEquals(msg, exception.get().getMessage()); assertEquals(1, count.get()); } + +@Test +public void shouldAcceptValidDateFormats() throws ParseException { +//check valid formats +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX")); +} + +@Test +public void shouldThrowOnInvalidDateFormat() { +//check some invalid formats +try { +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss")); Review comment: We should update this test and use `assertThrows` instead of `try-fail-catch`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) { } return map; } + +/** + * Convert an ISO8601 based timestamp to an epoch value + * @param timestamp to be converted + * @return epoch value of a given timestamp + * @throws ParseException for timestamp that doesn't follow ISO8601 format + */ +public static long getDateTime(String timestamp) throws ParseException { +final String[] timestampParts = timestamp.split("T"); +if (timestampParts.length < 2) { +throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length()); +} + +final String secondPart = timestampParts[1]; +if (secondPart == null || secondPart.isEmpty()) { +throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length()); +} + +if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) { Review comment: I know that this is exiting logic that was just moved, but I am not sure if I understand how it works? ## File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ## @@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging { case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset)) } } else if (opts.options.has(opts.resetToDatetimeOpt)) { -val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt)) +val timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt)) Review comment: I am must wondering if we can have a dependency from `core` to `clients` module? \cc @ijuma This is an automated message from the Apache Git Service. To respond to the message, pl
[GitHub] [kafka] mjsax commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
mjsax commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691339633 @MicahRam I completely agree with what you are saying, and there are actually two relevant KIPs in-flight atm the might help. For the Processor API, we could use the KIP John mentioned above (or start a new KIP) to extend the Processor API for better headers support (I just proposed this on the corresponding DISCUSS thread on the mailing list). Furthermore, there is KIP-478 that proposed to split the current `ProcessorContext` into multiple parts, including a `RecordContext` and we could change the API that the `RecordContext` is not available within `punctuate` to begin with and thus `offset/partition` etc cannot be called to begin with. Overall, both KIPs might resolve the issue you are facing. Bottom line still seems to be, that there is not really a bug, but "just" a missing feature: it's not possible to add headers to records that are send downstream from a `punctuation` call. (And you kinda "hack" around the missing feature to make it work -- btw: what workaround do you use?) Would you agree to this assessment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194548#comment-17194548 ] Matthias J. Sax commented on KAFKA-6127: Not 100% sure either :) – this ticket is almost 3 years old. If you have time, feel free to double check the usage of both consumer and producer calls. My gut feeling is, that this ticket might be void now (also considering the work on KIP-572), but we should double check before we close it. > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194543#comment-17194543 ] Sophie Blee-Goldman commented on KAFKA-6127: [~mjsax] is this ticket still relevant? I didn't think any of the Consumer APIs we use block indefinitely anymore, as the default.api.timeout.ms config takes effect. Less sure about the Producer APIs > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-7970) Missing topic causes service shutdown without exception
[ https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman resolved KAFKA-7970. Fix Version/s: 2.7.0 Assignee: Bruno Cadonna Resolution: Fixed > Missing topic causes service shutdown without exception > --- > > Key: KAFKA-7970 > URL: https://issues.apache.org/jira/browse/KAFKA-7970 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Jonny Heavey >Assignee: Bruno Cadonna >Priority: Minor > Fix For: 2.7.0 > > > When launching a KafkaStreams application that depends on a topic that > doesn't exist, the streams application correctly logs an error such as: > " is unknown yet during rebalance, please make sure they have > been pre-created before starting the Streams application." > The stream is then shutdown, however, no exception is thrown indicating that > an error has occurred. > In our circumstances, we run our streams app inside a container. The streams > service is shutdown, but the process is not exited, meaning that the > container does not crash (reducing visibility of the issue). > As no exception is thrown in the missing topic scenario described above, our > application code has no way to determine that something is wrong that would > then allow it to terminate the process. > > Could the onPartitionsAssigned method in StreamThread.java throw an exception > when it decides to shutdown the stream (somewhere around line 264)? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei merged pull request #9239: URL: https://github.com/apache/kafka/pull/9239 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on pull request #9239: URL: https://github.com/apache/kafka/pull/9239#issuecomment-691322853 These test failures look unrelated: ``` Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] Build / JDK 11 / org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing[exactly_once] Build / JDK 8 / kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record
[ https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194533#comment-17194533 ] Shaik Zakir Hussain commented on KAFKA-10477: - Observed that the issue doesn't occur in Kafka *v2.3.0* (where jackson dependency resolves to *v2.9.9*). > Sink Connector fails with DataException when trying to convert Kafka record > with empty key to Connect Record > > > Key: KAFKA-10477 > URL: https://issues.apache.org/jira/browse/KAFKA-10477 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Shaik Zakir Hussain >Priority: Major > > Sink connector is facing a DataException when trying to convert a kafka > record with empty key to Connect data format. > Kafka's trunk branch currently depends on *jackson v2.10.5* > A short unit test (shared below) in > `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. > > {code:java} > @Test > public void testToConnectDataEmptyKey() throws IOException { > Map props = > Collections.singletonMap("schemas.enable", false); > converter.configure(props, true); > String str = ""; > SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", > str.getBytes()); > System.out.println(schemaAndValue); > } > {code} > This test code snippet fails with the following exception: > {noformat} > org.apache.kafka.connect.errors.DataException: Unknown schema type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385) > at > org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {noformat} > > This seems related to the issue > [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson > lib started returning `MissingNode` for empty input in > `ObjectMapper.readTree(input)` method invocation. Precise code change can be > observed here: > [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094] > > > This causes an exception to throw up in our JsonConverter class : > [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764] > > > In my opinion, when the `jsonValue.getNod
[jira] [Updated] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record
[ https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaik Zakir Hussain updated KAFKA-10477: Affects Version/s: 2.4.0 > Sink Connector fails with DataException when trying to convert Kafka record > with empty key to Connect Record > > > Key: KAFKA-10477 > URL: https://issues.apache.org/jira/browse/KAFKA-10477 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Shaik Zakir Hussain >Priority: Major > > Sink connector is facing a DataException when trying to convert a kafka > record with empty key to Connect data format. > Kafka's trunk branch currently depends on *jackson v2.10.5* > A short unit test (shared below) in > `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. > > {code:java} > @Test > public void testToConnectDataEmptyKey() throws IOException { > Map props = > Collections.singletonMap("schemas.enable", false); > converter.configure(props, true); > String str = ""; > SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", > str.getBytes()); > System.out.println(schemaAndValue); > } > {code} > This test code snippet fails with the following exception: > {noformat} > org.apache.kafka.connect.errors.DataException: Unknown schema type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385) > at > org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {noformat} > > This seems related to the issue > [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson > lib started returning `MissingNode` for empty input in > `ObjectMapper.readTree(input)` method invocation. Precise code change can be > observed here: > [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094] > > > This causes an exception to throw up in our JsonConverter class : > [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764] > > > In my opinion, when the `jsonValue.getNodeType()` is `MISSING` > ([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org
[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam edited a comment on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131 @mjsax its been a while since I looked at the code but from what I remember from looking through the commit history it used to throw exceptions and there was a commit thats entire purpose was to use static object with defaults because the exceptions were not user friendly. So I guess there needs to be some clarification on what the actually behavior should be. We've figured out how to work around the issues I brought up in the KAFKA-9584 and still add headers to the output messages during punctuate processing. I personally would prefer it stay as is until producing messages with headers during punctuate processing is officially supported. I can imagine a lot of people would be unhappy if `context.offset` and `context.partition` started throwing exceptions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam edited a comment on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131 @mjsax its been a while since I looked at the code but from what I remember from looking through the commit history it used to throw exceptions and there was a commit thats entire purpose was to use static object with defaults because the exceptions were not user friendly. So I guess there needs to be some clarification on what the actually behavior should be. We've figured out how to work around the issues I brought up in the KAFKA-9584 and still add headers to the output messages during punctuate processing. I personally would prefer it stay as is until producing messages with headers during punctuate processing is officially supported. I can imagine a lot of people would be unhappy if context.offset and context.partition started throwing exceptions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131 @mjsax its been a while since I looked at the code but from what I remember from looking through the commit history it used to throw exceptions and there was a commit thats entire purpose was to use static object with defaults because the exceptions were not user friendly. So I guess there needs to be some clarification on what the actually behavior should be. We've figured out how to work around the issues I brought up in the KAFKA-9584 and still add headers to the output messages during punctuate processing. I personally would prefer it stay as is until producing messages with headers during punctuate processing is officially supported. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record
Shaik Zakir Hussain created KAFKA-10477: --- Summary: Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record Key: KAFKA-10477 URL: https://issues.apache.org/jira/browse/KAFKA-10477 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Shaik Zakir Hussain Sink connector is facing a DataException when trying to convert a kafka record with empty key to Connect data format. Kafka's trunk branch currently depends on *jackson v2.10.5* A short unit test (shared below) in `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. {code:java} @Test public void testToConnectDataEmptyKey() throws IOException { Map props = Collections.singletonMap("schemas.enable", false); converter.configure(props, true); String str = ""; SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", str.getBytes()); System.out.println(schemaAndValue); } {code} This test code snippet fails with the following exception: {noformat} org.apache.kafka.connect.errors.DataException: Unknown schema type: null at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385) at org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) {noformat} This seems related to the issue [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson lib started returning `MissingNode` for empty input in `ObjectMapper.readTree(input)` method invocation. Precise code change can be observed here: [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094] This causes an exception to throw up in our JsonConverter class : [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764] In my opinion, when the `jsonValue.getNodeType()` is `MISSING` ([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754] ), we need to fall back to the behaviour of the case `NULL` (i.e. return null), although not sure of any further repercussions this might bring in. Things were working fine when the dependency on *jackson* lib was of version *v2.9.10.3* or lesser as the `ObjectMapper` returned null in that case. Thanks, Zakir -- This message was sent by Atlassi
[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"
[ https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194483#comment-17194483 ] Swayam Raina commented on KAFKA-10467: -- > did it happen in the latest version of Kafka? > kafka-topics --version 2.3.1 (Commit:18a913733fb71c01) I was running Kafka from sources in debug mode > kafka-topic --describe fails for topic created by "produce" > --- > > Key: KAFKA-10467 > URL: https://issues.apache.org/jira/browse/KAFKA-10467 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.3.1 > Environment: MacOS >Reporter: Swayam Raina >Priority: Minor > > {code:java} > > kafka-topics --version > 2.3.1 (Commit:18a913733fb71c01){code} > > While producing to a topic that does not already exists > {code:java} > producer.send("does-not-exists", "msg-1") > {code} > > broker creates the topic > {code:java} > // partition file > > ls /tmp/kafka-logs/ > does-not-exists-0{code} > > If I try to list the topics, it shows also shows this new topic > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --list > does-not-exists-0 > {code} > Now while trying to describe the topic that was auto-created the following > error is thrown > > {code:java} > > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists > >--describe > Error while executing topic command : > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request.Error while executing topic > command : org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.[2020-09-08 > 00:21:30,890] ERROR java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228) > at > kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225) > at scala.collection.Iterator.foreach(Iterator.scala:941) at > scala.collection.Iterator.foreach$(Iterator.scala:941) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at > scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at > kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. (kafka.admin.TopicCommand$) > > {code} > ``` > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members
[ https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194469#comment-17194469 ] Sophie Blee-Goldman commented on KAFKA-10455: - Yeah, I think you're touching on a related issue – fast detection of source topic deletion – which I agree can/should be solved along the same lines as the probing rebalance bug in this ticket. Just for some context, to avoid accidental data corruption/loss, we of course want to react as fast as possible on source topic deletion. Currently we detect the absence of source topics during a rebalance and send an error code to all members to shut down. The problem is that there may be a gap of up to the metadata.max.age (default 5min) between the topic deletion and the reaction, ie triggering a rebalance and informing all members to shut down. Since only the leader is guaranteed to trigger a rebalance upon sending a JoinGroup, unless the leader happens to be assigned one of the partitions of the deleted tasks, it will not notice the topic deletion until it refreshes its metadata. If non-leaders are assigned to these deleted partitions and notice the topic deletion, they may not be able to trigger a rebalance even if they rejoin the group. Both problems could be solved by modifying the userdata to ensure any member's JoinGroup results in a rebalance. We could just add a single byte to the SubscriptionInfo and bump it when rejoining. This actually seems like a better all-around solution, since members of Streams should not be haphazardly sending JoinGroups for no reason – if they do, it must be because they want a rebalance. This way we don't have to worry about changing any broker side code and finding a workaround for older brokers. We could also take the approach of making sure the leader is responsible for triggering the rebalance, but this doesn't solve the source topic deletion problem. It also wouldn't help us in any new feature we wanted to add that required arbitrary members to trigger a rebalance. So I think we should just go with bumping a byte in the SubscriptionInfo > Probing rebalances are not guaranteed to be triggered by non-leader members > --- > > Key: KAFKA-10455 > URL: https://issues.apache.org/jira/browse/KAFKA-10455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Priority: Major > > Apparently, if a consumer rejoins the group with the same subscription > userdata that it previously sent, it will not trigger a rebalance. The one > exception here is that the group leader will always trigger a rebalance when > it rejoins the group. > This has implications for KIP-441, where we rely on asking an arbitrary > thread to enforce the followup probing rebalances. Technically we do ask a > thread living on the same instance as the leader, so the odds that the leader > will be chosen aren't completely abysmal, but for any multithreaded > application they are still at best only 50%. > Of course in general the userdata will have changed within a span of 10 > minutes, so the actual likelihood of hitting this is much lower – it can > only happen if the member's task offset sums remained unchanged. > Realistically, this probably requires that the member only have > fully-restored active tasks (encoded with the constant sentinel -2) and that > no tasks be added or removed. > > One solution would be to make sure the leader is responsible for the probing > rebalance. To do this, we would need to somehow expose the memberId of the > thread's main consumer to the partition assignor. I'm actually not sure if > that's currently possible to figure out or not. If not, we could just assign > the probing rebalance to every thread on the leader's instance. This > shouldn't result in multiple followup rebalances as the rebalance schedule > will be updated/reset on the first followup rebalance. > Another solution would be to make sure the userdata is always different. We > could encode an extra bit that flip-flops, but then we'd have to persist the > latest value somewhere/somehow. Alternatively we could just encode the next > probing rebalance time in the subscription userdata, since that is guaranteed > to always be different from the previous rebalance. This might get tricky > though, and certainly wastes space in the subscription userdata. Also, this > would only solve the problem for KIP-441 probing rebalances, meaning we'd > have to individually ensure the userdata has changed for every type of > followup rebalance (see related issue below). So the first proposal, > requiring the leader trigger the rebalance, would be preferable. > Note that, imho, we should just allow a
[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on pull request #9239: URL: https://github.com/apache/kafka/pull/9239#issuecomment-691224736 Oops: ``` 17:31:33 [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-9239/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java:215:5: Method length is 153 lines (max allowed is 150). [MethodLength] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9331) Add option to terminate application when StreamThread(s) die
[ https://issues.apache.org/jira/browse/KAFKA-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194449#comment-17194449 ] Walker Carlson commented on KAFKA-9331: --- [https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown] > Add option to terminate application when StreamThread(s) die > > > Key: KAFKA-9331 > URL: https://issues.apache.org/jira/browse/KAFKA-9331 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Priority: Minor > Labels: needs-kip > > Currently, if a {{StreamThread}} dies due to an unexpected exception, the > Streams application continues running. Even if all {{StreamThread}}(s) die, > the application will continue running, but will be in an {{ERROR}} state. > Many users want or expect the application to terminate in the event of a > fatal exception that kills one or more {{StreamThread}}(s). Currently, this > requires extra work from the developer to register an uncaught exception > handler on the {{KafkaStreams}} object and trigger a shutdown as needed. > It would be useful to provide a configurable option for the Streams > application to have it automatically terminate with an exception if one or > more {{StreamThread}}(s) die. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r487174400 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ## @@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() { inputTopic.pipeInput("k1", "v1", 7L); // final record to advance stream time and flush windows inputTopic.pipeInput("k1", "v1", 90L); +final Comparator> comparator = Review comment: Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487172402 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -832,18 +834,25 @@ public String queryableStoreName() { } @SuppressWarnings("unchecked") -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { final KTableSource source = (KTableSource) processorSupplier; +if (onlyIfMaterialized && !source.materialized()) { +return false; +} source.enableSendingOldValues(); Review comment: Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r487171555 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: But I wouldn't be afraid to just use a full if/else block, either. ```suggestion final WindowBytesStoreSupplier storeSupplier; if (inOrderIterator) { storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false); } else { storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(5), ofMillis(10), false); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r487170646 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: This is how I typically break up ternaries. ```suggestion final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", ofMillis(5), ofMillis(10), false); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487169320 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487167789 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGMT. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGTM. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I guess it's subjective. Personally, I would prefer to flip it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487057805 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code). So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed. to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ## @@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() { doTestNotSendingOldValue(builder, table1, table2, topic1); } +@Test +public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() { +final StreamsBuilder builder = new StreamsBuilder(); +final String topic1 = "topic1"; + +final KTableImpl table1 = +(KTableImpl) builder.table(topic1, consumed); +final KTableImpl table2 = (KTableImpl) table1.filter(predicate); + +table2.enableSendingOldValues(true); Review comment: See https://github.com/apache/kafka/pull/9156#discussion_r487057805 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487060171 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java ## @@ -77,10 +77,16 @@ public String getQueryableName() { } @Override -public void enableSendingOldValues() { -parent1.enableSendingOldValues(); -parent2.enableSendingOldValues(); +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!parent1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: as above. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java ## @@ -77,10 +77,16 @@ public String getQueryableName() { } @Override -public void enableSendingOldValues() { -parent1.enableSendingOldValues(); -parent2.enableSendingOldValues(); +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!parent1.enableSendingOldValues(onlyIfMaterialized)) { +throw new IllegalStateException("Table-table joins should always be materialized"); +} + +if (!parent2.enableSendingOldValues(onlyIfMaterialized)) { Review comment: as above. :p This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ## @@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() { doTestNotSendingOldValue(builder, table1, table2, topic1); } +@Test +public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() { +final StreamsBuilder builder = new StreamsBuilder(); +final String topic1 = "topic1"; + +final KTableImpl table1 = +(KTableImpl) builder.table(topic1, consumed); +final KTableImpl table2 = (KTableImpl) table1.filter(predicate); + +table2.enableSendingOldValues(true); Review comment: See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487060078 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { +throw new IllegalStateException("Table-table joins should always be materialized"); +} + +if (!table2.enableSendingOldValues(onlyIfMaterialized)) { Review comment: as above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487057186 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I did initially go with this. However, it seems really unintuitive. Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized. Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone. (Though of course JavaDocs would help). However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did. Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`. That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My view, as outlined above, is to stick with the pattern that's here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487058548 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -832,18 +834,25 @@ public String queryableStoreName() { } @SuppressWarnings("unchecked") -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { final KTableSource source = (KTableSource) processorSupplier; +if (onlyIfMaterialized && !source.materialized()) { +return false; +} source.enableSendingOldValues(); Review comment: Nope. The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487057805 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487057186 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I did initially go with this. However, it seems really unintuitive. Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized. Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone. (Though of course JavaDocs would help). However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did. Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java ## @@ -17,11 +17,12 @@ package org.apache.kafka.server.authorizer; -import java.net.InetAddress; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; +import java.net.InetAddress; Review comment: Done. (Must sort out Kafka code style settings at some point) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10476) Error when calling (kafka-configs.sh --add-config): org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient..... Caused by: javax.crypto.BadPadding
AbdulRahman Mahmoud created KAFKA-10476: --- Summary: Error when calling (kafka-configs.sh --add-config): org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient. Caused by: javax.crypto.BadPaddingException: Given final block Key: KAFKA-10476 URL: https://issues.apache.org/jira/browse/KAFKA-10476 Project: Kafka Issue Type: Bug Components: config Affects Versions: 2.2.1 Reporter: AbdulRahman Mahmoud Attachments: Error.txt, Script.txt For now We created a Cluster on AWS with MSK service, we can connetc to it through TLS by our ceritifcates for now we can create topics, produce and cosume with no problems. however when we use the same configuration for the "kafka-config-sh --add-config", like this attached [^Script.txt] snippet for example it fails, i have attached a file([^Error.txt]) for the log of the error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386 @mjsax @ableegoldman @hachikuji - tagging for awareness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore
jeqo commented on pull request #9139: URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209 cc @ableegoldman and team, this should be ready to review. Would also be great to look #9228 as part of KIP-666, to get the APIs aligned. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9795) Add throttleTimeMs in logs
[ https://issues.apache.org/jira/browse/KAFKA-9795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194209#comment-17194209 ] Rishika Modi commented on KAFKA-9795: - Thank you :) > Add throttleTimeMs in logs > -- > > Key: KAFKA-9795 > URL: https://issues.apache.org/jira/browse/KAFKA-9795 > Project: Kafka > Issue Type: Improvement > Components: log, logging >Reporter: Igor Vavrzhin >Assignee: Rishika Modi >Priority: Minor > > if trotting is triggered on a broker, then the client receives something like > this: > {code:java} > org.apache.kafka.clients.FetchSessionHandler.handleResponse > FetchResponse > throttleTimeMs = 2583 > error = {Errors} "NONE" > sessionId = 0 > responseData = {LinkedHashMap} size = 0 {code} > which in the logs looks like: > {code:java} > [INFO ] o.a.k.c.FetchSessionHandler [Consumer clientId=dev, groupId=null] > Node 18 sent an invalid full fetch response with extra=(topicName-11, > response=( {code} > It would be great to see the reason and parameter *throttleTimeMs* in the > logs! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9795) Add throttleTimeMs in logs
[ https://issues.apache.org/jira/browse/KAFKA-9795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rishika Modi reassigned KAFKA-9795: --- Assignee: Rishika Modi > Add throttleTimeMs in logs > -- > > Key: KAFKA-9795 > URL: https://issues.apache.org/jira/browse/KAFKA-9795 > Project: Kafka > Issue Type: Improvement > Components: log, logging >Reporter: Igor Vavrzhin >Assignee: Rishika Modi >Priority: Minor > > if trotting is triggered on a broker, then the client receives something like > this: > {code:java} > org.apache.kafka.clients.FetchSessionHandler.handleResponse > FetchResponse > throttleTimeMs = 2583 > error = {Errors} "NONE" > sessionId = 0 > responseData = {LinkedHashMap} size = 0 {code} > which in the logs looks like: > {code:java} > [INFO ] o.a.k.c.FetchSessionHandler [Consumer clientId=dev, groupId=null] > Node 18 sent an invalid full fetch response with extra=(topicName-11, > response=( {code} > It would be great to see the reason and parameter *throttleTimeMs* in the > logs! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nym3r0s opened a new pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s opened a new pull request #9280: URL: https://github.com/apache/kafka/pull/9280 KAFKA-10186: Abort transaction with pending data with TransactionAbortedException If a transaction is aborted with no underlying exception, throw a new kind of exception - `TransactionAbortedException` to distinguish this from other fatal exceptions. This is part of KIP-654 and resolves KAFKA-10186 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-690954007 @ijuma @hachikuji @guozhangwang any chance one of you could review this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level
[ https://issues.apache.org/jira/browse/KAFKA-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194093#comment-17194093 ] M. Manna commented on KAFKA-10465: -- [~guozhang] - Thanks for your comments. I am not sure whether you have let the producer run continuously, but it doesn't really make a difference either way. In our test environments where things are running perpetually, we see the same issue. And we don't pause anything. And the code i have shown you here mimics what we have. The issue is very specific when you assign (vs. Subscribe to) TopicPartition. If you check for endOffsets(), pausing or not pausing, you never seem to get the last committed offset at all. Also, if there is a transaction marker associated to it, this shouldn't be sent to the consumer as per documentation. Could you kindly comment on the above? > Potential Bug/Doc update in Transactional Producer and Isolation Level > -- > > Key: KAFKA-10465 > URL: https://issues.apache.org/jira/browse/KAFKA-10465 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.1 >Reporter: M. Manna >Priority: Critical > Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, > ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg > > > *Issue* > Difference between LSO and High Watermark offsets when a consumer with > "read_committed" aren't probably explained in the correct place. > *Expected Behaviour* > According to documentation, the offset returned should be the one committed > last (and successfully). > *Observed (with steps)* > 1. Start a local or test kafka cluster (2.4.1 or above) > 2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 > and 1 is good) > 3. Use the attached producer app file and set debug pointer to be able to > pause on print > 4. Use the attached consumer app file to start a consumer and debug through > steps. > It can be seen that the consumer is actually able to fetch an offset that's > not committed by the producer yet. > Just trying to raise this ticket to confirm whether: > 1) this is well-documented anywhere (which I have missed) - Please refer to > this documentation as a resolution > 2) This is a bug - please confirm and provide a timeline when this can be > fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version
[ https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194082#comment-17194082 ] Javier Holguera commented on KAFKA-9752: What version of the Kafka clients (consumer/producer/streams) use JoinGroup v4 that isn’t affected by this bug? Thx > Consumer rebalance can be stuck after new member timeout with old JoinGroup > version > --- > > Key: KAFKA-9752 > URL: https://issues.apache.org/jira/browse/KAFKA-9752 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.2, 2.3.1, 2.4.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2 > > > For older versions of the JoinGroup protocol (v3 and below), there was no way > for new consumer group members to get their memberId until the first > rebalance completed. If the JoinGroup request timed out and the client > disconnected, the member would nevertheless be left in the group until the > rebalance completed and the session timeout expired. > In order to prevent this situation from causing the group size to grow > unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new > member will be left in the group before it would be kicked out (in spite of > rebalance state). > In KAFKA-9232, we addressed one issue with this solution. Basically the new > member expiration logic did not properly get cancelled after the rebalance > completed which means that in certain cases, a successfully joined member > might get kicked out of the group unnecessarily. > Unfortunately, this patch introduced a regression in the normal session > expiration logic following completion of the initial rebalance. Basically the > expiration task fails to get scheduled properly. The issue is in this > function: > {code} > def shouldKeepAlive(deadlineMs: Long): Boolean = { > if (isNew) { > // New members are expired after the static join timeout > latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs > } else if (isAwaitingJoin || isAwaitingSync) { > // Don't remove members as long as they have a request in purgatory > true > } else { > // Otherwise check for session expiration > latestHeartbeat + sessionTimeoutMs > deadlineMs > } > } > {code} > We use this logic in order to check for session expiration. On the surface, > there is nothing wrong with it, but it has an odd interaction with the > purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, > the code relies on `shouldKeepAlive` returning false so that the heartbeat > task is not immediately completed. This only works because we update > `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means > that the first or third checks will fail, `shouldKeepAlive` will return > false, and the heartbeat expiration task will not be immediately completed. > The bug in this case has to do with the case when `isNew` is true. When we > schedule the session expiration task, the `isNew` flag is still set to true, > which means we will hit the first check above. Since in most cases, the > session timeout is less than the new member timeout of 5 minutes, the check > is very likely to return true. This seems like what we would want, but as > noted above, we rely on this function returning false when the expiration > task is passed to `tryCompleteElseWatch`. Since it returns true instead, the > task completes immediately, which means we cannot rely on its expiration. > The impact of this bug in the worst case is that a consumer group can be left > in the `PreparingRebalance` state indefinitely. This state will persist until > there is a coordinator change (e.g. as a result of restarting the broker). > Note that this is only possible if 1) we have a consumer using an old > JoinGroup version, 2) the consumer times out and disconnects from its initial > JoinGroup request. -- This message was sent by Atlassian Jira (v8.3.4#803005)