[jira] [Created] (KAFKA-10478) advertised.listeners should allow duplicated ports

2020-09-11 Thread Andre Araujo (Jira)
Andre Araujo created KAFKA-10478:


 Summary: advertised.listeners should allow duplicated ports
 Key: KAFKA-10478
 URL: https://issues.apache.org/jira/browse/KAFKA-10478
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Andre Araujo


The same 
[validations|https://github.com/apache/kafka/blob/391ad90112fb2e9a85bf76250d57863bbf33b383/core/src/main/scala/kafka/utils/CoreUtils.scala#L259-L260]
 performed for {{listeners}} endpoints are also applied to 
[{{advertised.listeners}}|https://github.com/apache/kafka/blob/e8b2dcdee6f25e9344d52b84e86328ec616bf819/core/src/main/scala/kafka/server/KafkaConfig.scala#L1689-L1691].

It makes sense that neither parameter should allow duplicated listener names. 
The port number restriction is different though.

It makes sense that we only allow one listener per port, since two listeners 
cannot bind to the same port at the same time (considering a single network 
interface).

For advertised listeners, though this doesn't apply since Kafka doesn't 
actually bind to the advertised listener ports. A practical application of 
relaxing this restriction for {{advertised.listeners}} is the following:

When configuring Kafka using Kerberos authentication and a Load Balancer we 
need to have two SASL_SSL listeners: (A) one running with the 
{{kafka/hostname}} principal and (B) another using {{kafka/lb_name}}, which is 
necessary for proper authentication when using the LB FQDN. After bootstrap, 
though, the client receives the brokers' addresses with the actual host FQDNs 
advertised by the brokers. To connect to the brokerd using the hostnames the 
client must connect to the listener A to be able to authenticate successfully 
with Kerberos.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector

2020-09-11 Thread GitBox


chia7712 commented on pull request #9271:
URL: https://github.com/apache/kafka/pull/9271#issuecomment-691397979


   ```
   失敗
   Build / JDK 8 / 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries
   ```
   it is already fixed by 
https://github.com/apache/kafka/commit/e4eab377e1489fc0cc90edec3eeb382b1192a442



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-09-11 Thread GitBox


guozhangwang commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-691389142


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6127) Streams should never block infinitely

2020-09-11 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194590#comment-17194590
 ] 

Guozhang Wang commented on KAFKA-6127:
--

I think we can close this ticket now, since:

* All blocking client calls should be covered with KIP-572 now.
* Exceptional cases which would make producer / consumer to not be able to 
proceed should be covered with  other created tickets now.

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

2020-09-11 Thread GitBox


showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-691365862


   @junrao , No, the failing tests are not related to my change.
   
   **tests/Build/JDK 11**: PASS
   **tests/Build/JDK 15**: 
DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota 
failed, which is not related to my change
   **tests/Build/JDK 11**: KafkaConsumerTest.testReturnRecordsDuringRebalance 
failed, which is not related to my change
   
   Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-11 Thread GitBox


MicahRam commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691360304


   @mjsax that sounds like it will work for the majority of the use cases I've 
come across. One thing thats not clear to me is how kstream operations that are 
down stream of a process would handle headers. For instance:
   
   ```
   from("in")
   .transform(ForwardsFromTransformAndPunctuate::new) 
   .transform(ModifyHeadersInTransform::new) // Should this type check the 
context before accessing headers?
   .setHeaders() // will this exception when processing driven by punctuate 
reaches here?
   .to("out") 
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-11 Thread GitBox


ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663


   Alright this should finally be ready for a final pass and merge @vvcephei  
@cadonna -- sorry for leaving this hanging for so long.
   
   After running a few different tests, it seems like the HATA assignor may 
actually be faster than the old StickyTaskAssignor in most cases. The HATA 
seems to scale slightly worse with increasing number of clients, but 
significantly better with partition count and num standbys
   
   The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 
standby) took the HATA 20s for the full test, but only ~1-2s for the STA and 
FPTA. 
   
   The `testManyStandbys` with 100 clients,  1000 partitions, and 50 standbys 
took the HATA roughly 10s, and the STA/FPTA just under a full minute.
   
   The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby 
took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on 
the first assignment alone (taking just over 1.5 minutes)
   
   Decided to reduce the number of partitions in the `testLargePartitionCount` 
test to 3,000 rather than increasing the timeout for all tests, as it's already 
pretty high. Maybe we can drop the STA sooner or later and then tighten it up



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-11 Thread GitBox


ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934


   Also, looks like there were checkstyle issues. Probably you just need to add 
the license header to the new file



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-11 Thread GitBox


ableegoldman commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   This comment might be unnecessary, the code is pretty self-explanatory 
in this case 🙂 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-6127) Streams should never block infinitely

2020-09-11 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194565#comment-17194565
 ] 

Sophie Blee-Goldman edited comment on KAFKA-6127 at 9/11/20, 11:47 PM:
---

[~guozhang] [~vvcephei]  any thoughts on this? You probably touched on a lot of 
these calls back in "The Refactor" – I remember seeing that we started to catch 
TimeoutException in the Consumer#position call, for example.

I know there are some related issues with infinite blocking in exceptional 
cases, for example when a topic is deleted out from under a Producer, but we 
already have a separate ticket for that. If that sort of thing is the only one 
remaining we should maybe close this as a duplicate instead


was (Author: ableegoldman):
[~guozhang] any thoughts on this? You probably touched on a lot of these calls 
back in "The Refactor" – I remember seeing that we started to catch 
TimeoutException in the Consumer#position call, for example.

I know there are some related issues with infinite blocking in exceptional 
cases, for example when a topic is deleted out from under a Producer, but we 
already have a separate ticket for that. If that sort of thing is the only one 
remaining we should maybe close this as a duplicate instead

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6127) Streams should never block infinitely

2020-09-11 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194565#comment-17194565
 ] 

Sophie Blee-Goldman commented on KAFKA-6127:


[~guozhang] any thoughts on this? You probably touched on a lot of these calls 
back in "The Refactor" – I remember seeing that we started to catch 
TimeoutException in the Consumer#position call, for example.

I know there are some related issues with infinite blocking in exceptional 
cases, for example when a topic is deleted out from under a Producer, but we 
already have a separate ticket for that. If that sort of thing is the only one 
remaining we should maybe close this as a duplicate instead

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools

2020-09-11 Thread GitBox


mjsax commented on a change in pull request #9255:
URL: https://github.com/apache/kafka/pull/9255#discussion_r487321249



##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
 assertEquals(msg, exception.get().getMessage());
 assertEquals(1, count.get());
 }
+
+@Test
+public void shouldAcceptValidDateFormats() throws ParseException {
+//check valid formats
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX"));
+}
+
+@Test
+public void shouldThrowOnInvalidDateFormat() {
+//check some invalid formats
+try {
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss"));
+fail("Call to getDateTime should fail");
+} catch (final Exception e) {
+e.printStackTrace();

Review comment:
   We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
 }
 return map;
 }
+
+/**
+ * Convert an ISO8601 based timestamp to an epoch value

Review comment:
   nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
 assertEquals(msg, exception.get().getMessage());
 assertEquals(1, count.get());
 }
+
+@Test
+public void shouldAcceptValidDateFormats() throws ParseException {
+//check valid formats
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX"));
+}
+
+@Test
+public void shouldThrowOnInvalidDateFormat() {
+//check some invalid formats
+try {
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss"));

Review comment:
   We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
 }
 return map;
 }
+
+/**
+ * Convert an ISO8601 based timestamp to an epoch value
+ * @param timestamp to be converted
+ * @return epoch value of a given timestamp
+ * @throws ParseException for timestamp that doesn't follow ISO8601 format
+ */
+public static long getDateTime(String timestamp) throws ParseException {
+final String[] timestampParts = timestamp.split("T");
+if (timestampParts.length < 2) {
+throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+}
+
+final String secondPart = timestampParts[1];
+if (secondPart == null || secondPart.isEmpty()) {
+throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+}
+
+if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
   I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
   case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
 }
   } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
   I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 





This is an automated message from the Apache Git Service.
To respond to the message, pl

[GitHub] [kafka] mjsax commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-11 Thread GitBox


mjsax commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691339633


   @MicahRam I completely agree with what you are saying, and there are 
actually two relevant KIPs in-flight atm the might help. For the Processor API, 
we could use the KIP John mentioned above (or start a new KIP) to extend the 
Processor API for better headers support (I just proposed this on the 
corresponding DISCUSS thread on the mailing list).
   
   Furthermore, there is KIP-478 that proposed to split the current 
`ProcessorContext` into multiple parts, including a `RecordContext` and we 
could change the API that the `RecordContext` is not available within 
`punctuate` to begin with and thus `offset/partition` etc cannot be called to 
begin with.
   
   Overall, both KIPs might resolve the issue you are facing. Bottom line still 
seems to be, that there is not really a bug, but "just" a missing feature: it's 
not possible to add headers to records that are send downstream from a 
`punctuation` call. (And you kinda "hack" around the missing feature to make it 
work -- btw: what workaround do you use?)
   
   Would you agree to this assessment?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6127) Streams should never block infinitely

2020-09-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194548#comment-17194548
 ] 

Matthias J. Sax commented on KAFKA-6127:


Not 100% sure either :) – this ticket is almost 3 years old. If you have time, 
feel free to double check the usage of both consumer and producer calls. My gut 
feeling is, that this ticket might be void now (also considering the work on 
KIP-572), but we should double check before we close it.

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6127) Streams should never block infinitely

2020-09-11 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194543#comment-17194543
 ] 

Sophie Blee-Goldman commented on KAFKA-6127:


[~mjsax] is this ticket still relevant? I didn't think any of the Consumer APIs 
we use block indefinitely anymore, as the default.api.timeout.ms config takes 
effect. Less sure about the Producer APIs

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7970) Missing topic causes service shutdown without exception

2020-09-11 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7970.

Fix Version/s: 2.7.0
 Assignee: Bruno Cadonna
   Resolution: Fixed

> Missing topic causes service shutdown without exception
> ---
>
> Key: KAFKA-7970
> URL: https://issues.apache.org/jira/browse/KAFKA-7970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonny Heavey
>Assignee: Bruno Cadonna
>Priority: Minor
> Fix For: 2.7.0
>
>
> When launching a KafkaStreams application that depends on a topic that 
> doesn't exist, the streams application correctly logs an error such as:
> " is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application."
> The stream is then shutdown, however, no exception is thrown indicating that 
> an error has occurred.
> In our circumstances, we run our streams app inside a container. The streams 
> service is shutdown, but the process is not exited, meaning that the 
> container does not crash (reducing visibility of the issue).
> As no exception is thrown in the missing topic scenario described above, our 
> application code has no way to determine that something is wrong that would 
> then allow it to terminate the process.
>  
> Could the onPartitionsAssigned method in StreamThread.java throw an exception 
> when it decides to shutdown the stream (somewhere around line 264)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-11 Thread GitBox


vvcephei merged pull request #9239:
URL: https://github.com/apache/kafka/pull/9239


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-11 Thread GitBox


vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-691322853


   These test failures look unrelated:
   ```
   Build / JDK 11 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   Build / JDK 11 / 
org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing[exactly_once]
   Build / JDK 8 / 
kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

2020-09-11 Thread Shaik Zakir Hussain (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194533#comment-17194533
 ] 

Shaik Zakir Hussain commented on KAFKA-10477:
-

Observed that the issue doesn't occur in Kafka *v2.3.0* (where jackson 
dependency resolves to *v2.9.9*).

> Sink Connector fails with DataException when trying to convert Kafka record 
> with empty key to Connect Record
> 
>
> Key: KAFKA-10477
> URL: https://issues.apache.org/jira/browse/KAFKA-10477
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Shaik Zakir Hussain
>Priority: Major
>
> Sink connector is facing a DataException when trying to convert a kafka 
> record with empty key to Connect data format. 
> Kafka's trunk branch currently depends on *jackson v2.10.5* 
> A short unit test (shared below) in 
> `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. 
>  
> {code:java}
> @Test
> public void testToConnectDataEmptyKey() throws IOException {
> Map props = 
> Collections.singletonMap("schemas.enable", false);
> converter.configure(props, true);
> String str = "";
> SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", 
> str.getBytes());
> System.out.println(schemaAndValue);
> }
> {code}
> This test code snippet fails with the following exception:
> {noformat}
> org.apache.kafka.connect.errors.DataException: Unknown schema type: null
>   at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
>   at 
> org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {noformat}
>  
> This seems related to the issue 
> [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson 
> lib started returning `MissingNode` for empty input in 
> `ObjectMapper.readTree(input)` method invocation. Precise code change can be 
> observed here: 
> [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094]
>  
>  
> This causes an exception to throw up in our JsonConverter class : 
> [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764]
>  
>  
> In my opinion, when the `jsonValue.getNod

[jira] [Updated] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

2020-09-11 Thread Shaik Zakir Hussain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaik Zakir Hussain updated KAFKA-10477:

Affects Version/s: 2.4.0

> Sink Connector fails with DataException when trying to convert Kafka record 
> with empty key to Connect Record
> 
>
> Key: KAFKA-10477
> URL: https://issues.apache.org/jira/browse/KAFKA-10477
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Shaik Zakir Hussain
>Priority: Major
>
> Sink connector is facing a DataException when trying to convert a kafka 
> record with empty key to Connect data format. 
> Kafka's trunk branch currently depends on *jackson v2.10.5* 
> A short unit test (shared below) in 
> `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. 
>  
> {code:java}
> @Test
> public void testToConnectDataEmptyKey() throws IOException {
> Map props = 
> Collections.singletonMap("schemas.enable", false);
> converter.configure(props, true);
> String str = "";
> SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", 
> str.getBytes());
> System.out.println(schemaAndValue);
> }
> {code}
> This test code snippet fails with the following exception:
> {noformat}
> org.apache.kafka.connect.errors.DataException: Unknown schema type: null
>   at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
>   at 
> org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {noformat}
>  
> This seems related to the issue 
> [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson 
> lib started returning `MissingNode` for empty input in 
> `ObjectMapper.readTree(input)` method invocation. Precise code change can be 
> observed here: 
> [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094]
>  
>  
> This causes an exception to throw up in our JsonConverter class : 
> [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764]
>  
>  
> In my opinion, when the `jsonValue.getNodeType()` is `MISSING` 
> ([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org

[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-11 Thread GitBox


MicahRam edited a comment on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131


   @mjsax its been a while since I looked at the code but from what I remember 
from looking through the commit history it used to throw exceptions and there 
was a commit thats entire purpose was to use static object with defaults 
because the exceptions were not user friendly. So I guess there needs to be 
some clarification on what the actually behavior should be. We've figured out 
how to work around the issues I brought up in the KAFKA-9584 and still add 
headers to the output messages during punctuate processing. I personally would 
prefer it stay as is until producing messages with headers during punctuate 
processing is officially supported. 
   
   I can imagine a lot of people would be unhappy if `context.offset` and 
`context.partition` started throwing exceptions



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-11 Thread GitBox


MicahRam edited a comment on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131


   @mjsax its been a while since I looked at the code but from what I remember 
from looking through the commit history it used to throw exceptions and there 
was a commit thats entire purpose was to use static object with defaults 
because the exceptions were not user friendly. So I guess there needs to be 
some clarification on what the actually behavior should be. We've figured out 
how to work around the issues I brought up in the KAFKA-9584 and still add 
headers to the output messages during punctuate processing. I personally would 
prefer it stay as is until producing messages with headers during punctuate 
processing is officially supported. 
   
   I can imagine a lot of people would be unhappy if context.offset and 
context.partition started throwing exceptions



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-11 Thread GitBox


MicahRam commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131


   @mjsax its been a while since I looked at the code but from what I remember 
from looking through the commit history it used to throw exceptions and there 
was a commit thats entire purpose was to use static object with defaults 
because the exceptions were not user friendly. So I guess there needs to be 
some clarification on what the actually behavior should be. We've figured out 
how to work around the issues I brought up in the KAFKA-9584 and still add 
headers to the output messages during punctuate processing. I personally would 
prefer it stay as is until producing messages with headers during punctuate 
processing is officially supported. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

2020-09-11 Thread Shaik Zakir Hussain (Jira)
Shaik Zakir Hussain created KAFKA-10477:
---

 Summary: Sink Connector fails with DataException when trying to 
convert Kafka record with empty key to Connect Record
 Key: KAFKA-10477
 URL: https://issues.apache.org/jira/browse/KAFKA-10477
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Shaik Zakir Hussain


Sink connector is facing a DataException when trying to convert a kafka record 
with empty key to Connect data format. 

Kafka's trunk branch currently depends on *jackson v2.10.5* 

A short unit test (shared below) in 
`org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue.  
{code:java}
@Test
public void testToConnectDataEmptyKey() throws IOException {
Map props = Collections.singletonMap("schemas.enable", 
false);
converter.configure(props, true);
String str = "";
SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", 
str.getBytes());
System.out.println(schemaAndValue);
}
{code}
This test code snippet fails with the following exception:
{noformat}
org.apache.kafka.connect.errors.DataException: Unknown schema type: null

at 
org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
at 
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
at 
org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
{noformat}
 

This seems related to the issue 
[https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson lib 
started returning `MissingNode` for empty input in 
`ObjectMapper.readTree(input)` method invocation. Precise code change can be 
observed here: 
[https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094]
 

 

This causes an exception to throw up in our JsonConverter class : 
[https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764]
 

 

In my opinion, when the `jsonValue.getNodeType()` is `MISSING` 
([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754]
 ), we need to fall back to the behaviour of the case `NULL` (i.e. return 
null), although not sure of any further repercussions this might bring in.

 

Things were working fine when the dependency on *jackson* lib was of version  
*v2.9.10.3* or lesser as the `ObjectMapper` returned null in that case.

 

Thanks,

Zakir



--
This message was sent by Atlassi

[jira] [Commented] (KAFKA-10467) kafka-topic --describe fails for topic created by "produce"

2020-09-11 Thread Swayam Raina (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194483#comment-17194483
 ] 

Swayam Raina commented on KAFKA-10467:
--

> did it happen in the latest version of Kafka?
> kafka-topics --version

2.3.1 (Commit:18a913733fb71c01)
I was running Kafka from sources in debug mode

 

 

> kafka-topic --describe fails for topic created by "produce"
> ---
>
> Key: KAFKA-10467
> URL: https://issues.apache.org/jira/browse/KAFKA-10467
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.1
> Environment: MacOS 
>Reporter: Swayam Raina
>Priority: Minor
>
> {code:java}
> > kafka-topics --version
> 2.3.1 (Commit:18a913733fb71c01){code}
>  
> While producing to a topic that does not already exists
> {code:java}
> producer.send("does-not-exists", "msg-1")
> {code}
>  
> broker creates the topic
> {code:java}
> // partition file
> > ls /tmp/kafka-logs/
> does-not-exists-0{code}
>  
> If I try to list the topics, it shows also shows this new topic
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --list
> does-not-exists-0
> {code}
> Now while trying to describe the topic that was auto-created the following 
> error is thrown
>  
> {code:java}
> > kafka-topics --bootstrap-server localhost:9092 --topic does-not-exists 
> >--describe
> Error while executing topic command : 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.Error while executing topic 
> command : org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.[2020-09-08 
> 00:21:30,890] ERROR java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3(TopicCommand.scala:228)
>  at 
> kafka.admin.TopicCommand$AdminClientTopicService.$anonfun$describeTopic$3$adapted(TopicCommand.scala:225)
>  at scala.collection.Iterator.foreach(Iterator.scala:941) at 
> scala.collection.Iterator.foreach$(Iterator.scala:941) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:225)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)Caused by: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request. (kafka.admin.TopicCommand$)
>  
> {code}
> ```
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-09-11 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194469#comment-17194469
 ] 

Sophie Blee-Goldman commented on KAFKA-10455:
-

Yeah, I think you're touching on a related issue – fast detection of source 
topic deletion – which I agree can/should be solved along the same lines as the 
probing rebalance bug in this ticket.

Just for some context, to avoid accidental data corruption/loss, we of course 
want to react as fast as possible on source topic deletion. Currently we detect 
the absence of source topics during a rebalance and send an error code to all 
members to shut down. The problem is that there may be a gap of up to the 
metadata.max.age (default 5min) between the topic deletion and the reaction, ie 
triggering a rebalance and informing all members to shut down. Since only the 
leader is guaranteed to trigger a rebalance upon sending a JoinGroup, unless 
the leader happens to be assigned one of the partitions of the deleted tasks, 
it will not notice the topic deletion until it refreshes its metadata. If 
non-leaders are assigned to these deleted partitions and notice the topic 
deletion, they may not be able to trigger a rebalance even if they rejoin the 
group.

Both problems could be solved by modifying the userdata to ensure any member's 
JoinGroup results in a rebalance. We could just add a single byte to the 
SubscriptionInfo and bump it when rejoining. This actually seems like a better 
all-around solution, since members of Streams should not be haphazardly sending 
JoinGroups for no reason – if they do, it must be because they want a 
rebalance. This way we don't have to worry about changing any broker side code 
and finding a workaround for older brokers. 

We could also take the approach of making sure the leader is responsible for 
triggering the rebalance, but this doesn't solve the source topic deletion 
problem. It also wouldn't help us in any new feature we wanted to add that 
required arbitrary members to trigger a rebalance. So I think we should just go 
with bumping a byte in the SubscriptionInfo

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---
>
> Key: KAFKA-10455
> URL: https://issues.apache.org/jira/browse/KAFKA-10455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow a

[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-11 Thread GitBox


vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-691224736


   Oops:
   ```
   17:31:33  [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-9239/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java:215:5:
 Method length is 153 lines (max allowed is 150). [MethodLength]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9331) Add option to terminate application when StreamThread(s) die

2020-09-11 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194449#comment-17194449
 ] 

Walker Carlson commented on KAFKA-9331:
---

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown]

> Add option to terminate application when StreamThread(s) die
> 
>
> Key: KAFKA-9331
> URL: https://issues.apache.org/jira/browse/KAFKA-9331
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Priority: Minor
>  Labels: needs-kip
>
> Currently, if a {{StreamThread}} dies due to an unexpected exception, the 
> Streams application continues running. Even if all {{StreamThread}}(s) die, 
> the application will continue running, but will be in an {{ERROR}} state. 
> Many users want or expect the application to terminate in the event of a 
> fatal exception that kills one or more {{StreamThread}}(s). Currently, this 
> requires extra work from the developer to register an uncaught exception 
> handler on the {{KafkaStreams}} object and trigger a shutdown as needed.
> It would be useful to provide a configurable option for the Streams 
> application to have it automatically terminate with an exception if one or 
> more {{StreamThread}}(s) die.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-11 Thread GitBox


vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487174400



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
 inputTopic.pipeInput("k1", "v1", 7L);
 // final record to advance stream time and flush windows
 inputTopic.pipeInput("k1", "v1", 90L);
+final Comparator> comparator =

Review comment:
   Yeah, this sounds right. Either the current record's timestamp is less 
than the event time for some of the windows, in which case it doesn't advance 
the partition's stream time, or it is more advanced than the (prior) event time 
for all the windows, in which case it does advance the stream time, but all the 
updated windows' event times are equal to the current record's timestamp, which 
is also equal to the new stream time, which should also be ok for suppression.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487172402



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -832,18 +834,25 @@ public String queryableStoreName() {
 }
 
 @SuppressWarnings("unchecked")
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
 if (!sendOldValues) {
 if (processorSupplier instanceof KTableSource) {
 final KTableSource source = (KTableSource) 
processorSupplier;
+if (onlyIfMaterialized && !source.materialized()) {
+return false;
+}
 source.enableSendingOldValues();

Review comment:
   Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that 
was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-11 Thread GitBox


vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487171555



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
   final WindowBytesStoreSupplier storeSupplier;
   if (inOrderIterator) {
   storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 
5L, 10L, false);
   } else {
   storeSupplier = Stores.inMemoryWindowStore("Reverse", 
ofMillis(5), ofMillis(10), false);
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-11 Thread GitBox


vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487170646



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   This is how I typically break up ternaries. 
   
   ```suggestion
   final WindowBytesStoreSupplier storeSupplier = 
   inOrderIterator 
   ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 
10L, false) 
   : Stores.inMemoryWindowStore("Reverse", ofMillis(5), 
ofMillis(10), false);
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487169320



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My comment was about your pattern... (if we flip the logic, we would 
need to pass in `true` to force a materialization). My point is, shouldn't we 
pass in a constant? For KTableKTableAbstractJoin we always want that the 
upstream is sending us old values.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487167789



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I guess it's subjective. Personally, I would prefer to flip it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-11 Thread GitBox


lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057805



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).  So if we don't call 
`enableSendingOldValues` on it, it won't swallow the output when things haven't 
changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used 
to both signify that the upstream is sending old values, and to control if the 
filter should forward old values. I'll split these into two variables.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
 doTestNotSendingOldValue(builder, table1, table2, topic1);
 }
 
+@Test
+public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+final StreamsBuilder builder = new StreamsBuilder();
+final String topic1 = "topic1";
+
+final KTableImpl table1 =
+(KTableImpl) builder.table(topic1, 
consumed);
+final KTableImpl table2 = 
(KTableImpl) table1.filter(predicate);
+
+table2.enableSendingOldValues(true);

Review comment:
   See https://github.com/apache/kafka/pull/9156#discussion_r487057805





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060171



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##
@@ -77,10 +77,16 @@ public String getQueryableName() {
 }
 
 @Override
-public void enableSendingOldValues() {
-parent1.enableSendingOldValues();
-parent2.enableSendingOldValues();
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   as above.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##
@@ -77,10 +77,16 @@ public String getQueryableName() {
 }
 
 @Override
-public void enableSendingOldValues() {
-parent1.enableSendingOldValues();
-parent2.enableSendingOldValues();
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+throw new IllegalStateException("Table-table joins should always 
be materialized");
+}
+
+if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   as above. :p





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
 doTestNotSendingOldValue(builder, table1, table2, topic1);
 }
 
+@Test
+public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+final StreamsBuilder builder = new StreamsBuilder();
+final String topic1 = "topic1";
+
+final KTableImpl table1 =
+(KTableImpl) builder.table(topic1, 
consumed);
+final KTableImpl table2 = 
(KTableImpl) table1.filter(predicate);
+
+table2.enableSendingOldValues(true);

Review comment:
   See [my comment 
above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My view, [as outlined 
above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My view, [as outlined 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060078



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+throw new IllegalStateException("Table-table joins should always 
be materialized");
+}
+
+if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   as above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057186



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a 
consensus we should flip, then happy enough to do it.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My view, as outlined above, is to stick with the pattern that's here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487058548



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -832,18 +834,25 @@ public String queryableStoreName() {
 }
 
 @SuppressWarnings("unchecked")
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
 if (!sendOldValues) {
 if (processorSupplier instanceof KTableSource) {
 final KTableSource source = (KTableSource) 
processorSupplier;
+if (onlyIfMaterialized && !source.materialized()) {
+return false;
+}
 source.enableSendingOldValues();

Review comment:
   Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057805



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057186



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-11 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
   Done. (Must sort out Kafka code style settings at some point)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10476) Error when calling (kafka-configs.sh --add-config): org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient..... Caused by: javax.crypto.BadPadding

2020-09-11 Thread AbdulRahman Mahmoud (Jira)
AbdulRahman Mahmoud created KAFKA-10476:
---

 Summary: Error when calling (kafka-configs.sh --add-config): 
org.apache.kafka.common.KafkaException: Failed to create new 
KafkaAdminClient. Caused by: javax.crypto.BadPaddingException: Given final 
block 
 Key: KAFKA-10476
 URL: https://issues.apache.org/jira/browse/KAFKA-10476
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 2.2.1
Reporter: AbdulRahman Mahmoud
 Attachments: Error.txt, Script.txt

For now We created a Cluster on AWS with MSK service, we can connetc to it 
through TLS by our ceritifcates for now we can create topics, produce and 
cosume with no problems.

however   when we use the same configuration for the "kafka-config-sh 
--add-config", like this attached  [^Script.txt] snippet for example it fails, 
i have attached a file([^Error.txt]) for the log of the error.   

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-11 Thread GitBox


nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386


   @mjsax @ableegoldman @hachikuji - tagging for awareness. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

2020-09-11 Thread GitBox


jeqo commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209


   cc @ableegoldman and team, this should be ready to review. 
   Would also be great to look #9228 as part of KIP-666, to get the APIs 
aligned. 
   
   Thanks!  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9795) Add throttleTimeMs in logs

2020-09-11 Thread Rishika Modi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194209#comment-17194209
 ] 

Rishika Modi commented on KAFKA-9795:
-

Thank you :)

> Add throttleTimeMs in logs
> --
>
> Key: KAFKA-9795
> URL: https://issues.apache.org/jira/browse/KAFKA-9795
> Project: Kafka
>  Issue Type: Improvement
>  Components: log, logging
>Reporter: Igor Vavrzhin
>Assignee: Rishika Modi
>Priority: Minor
>
> if trotting is triggered on a broker, then the client receives something like 
> this:
> {code:java}
> org.apache.kafka.clients.FetchSessionHandler.handleResponse
> FetchResponse 
>  throttleTimeMs = 2583
>  error = {Errors} "NONE"
>  sessionId = 0
>  responseData = {LinkedHashMap}  size = 0 {code}
> which in the logs looks like:
> {code:java}
> [INFO ] o.a.k.c.FetchSessionHandler  [Consumer clientId=dev, groupId=null] 
> Node 18 sent an invalid full fetch response with extra=(topicName-11, 
> response=( {code}
> It would be great to see the reason and parameter *throttleTimeMs* in the 
> logs!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9795) Add throttleTimeMs in logs

2020-09-11 Thread Rishika Modi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rishika Modi reassigned KAFKA-9795:
---

Assignee: Rishika Modi

> Add throttleTimeMs in logs
> --
>
> Key: KAFKA-9795
> URL: https://issues.apache.org/jira/browse/KAFKA-9795
> Project: Kafka
>  Issue Type: Improvement
>  Components: log, logging
>Reporter: Igor Vavrzhin
>Assignee: Rishika Modi
>Priority: Minor
>
> if trotting is triggered on a broker, then the client receives something like 
> this:
> {code:java}
> org.apache.kafka.clients.FetchSessionHandler.handleResponse
> FetchResponse 
>  throttleTimeMs = 2583
>  error = {Errors} "NONE"
>  sessionId = 0
>  responseData = {LinkedHashMap}  size = 0 {code}
> which in the logs looks like:
> {code:java}
> [INFO ] o.a.k.c.FetchSessionHandler  [Consumer clientId=dev, groupId=null] 
> Node 18 sent an invalid full fetch response with extra=(topicName-11, 
> response=( {code}
> It would be great to see the reason and parameter *throttleTimeMs* in the 
> logs!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nym3r0s opened a new pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-11 Thread GitBox


nym3r0s opened a new pull request #9280:
URL: https://github.com/apache/kafka/pull/9280


   KAFKA-10186: Abort transaction with pending data with 
TransactionAbortedException
   
   If a transaction is aborted with no underlying exception, throw
   a new kind of exception - `TransactionAbortedException` to
   distinguish this from other fatal exceptions.
   
   This is part of KIP-654 and resolves KAFKA-10186
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-11 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-690954007


   @ijuma @hachikuji @guozhangwang any chance one of you could review this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10465) Potential Bug/Doc update in Transactional Producer and Isolation Level

2020-09-11 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194093#comment-17194093
 ] 

M. Manna commented on KAFKA-10465:
--

[~guozhang] - Thanks for your comments. I am not sure whether you have let the 
producer run continuously, but it doesn't really make a difference either way. 
In our test environments where things are running perpetually, we see the same 
issue. And we don't pause anything. And the code i have shown you here mimics 
what we have. 

The issue is very specific when you assign (vs. Subscribe to) TopicPartition. 
If you check for endOffsets(), pausing or not pausing, you never seem to get 
the last committed offset at all. Also, if there is a transaction marker 
associated to it, this shouldn't be sent to the consumer as per documentation. 

Could you kindly comment on the above?

> Potential Bug/Doc update in Transactional Producer and Isolation Level
> --
>
> Key: KAFKA-10465
> URL: https://issues.apache.org/jira/browse/KAFKA-10465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1
>Reporter: M. Manna
>Priority: Critical
> Attachments: ConsumerTestApp.java, Consumer_endOffsets_return_84.jpg, 
> ProducerTestApp.java, Producer_Committed_Successfully_to_82.jpg
>
>
> *Issue*
> Difference between LSO and High Watermark offsets when a consumer with 
> "read_committed" aren't probably explained in the correct place.
> *Expected Behaviour*
> According to documentation, the offset returned should be the one committed 
> last (and successfully). 
> *Observed (with steps)*
> 1. Start a local or test kafka cluster (2.4.1 or above)
> 2. Create a topic (I have used 3 replication-factor with 1 partition, but 1 
> and 1 is good)
> 3. Use the attached producer app file and set debug pointer to be able to 
> pause on print
> 4. Use the attached consumer app file to start a consumer and debug through 
> steps.
> It can be seen that the consumer is actually able to fetch an offset that's 
> not committed by the producer yet. 
> Just trying to raise this ticket to confirm whether:
> 1) this is well-documented anywhere (which I have missed) - Please refer to 
> this documentation as a resolution
> 2) This is a bug - please confirm and provide a timeline when this can be 
> fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9752) Consumer rebalance can be stuck after new member timeout with old JoinGroup version

2020-09-11 Thread Javier Holguera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194082#comment-17194082
 ] 

Javier Holguera commented on KAFKA-9752:


What version of the Kafka clients (consumer/producer/streams) use JoinGroup v4 
that isn’t affected by this bug?

Thx

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> ---
>
> Key: KAFKA-9752
> URL: https://issues.apache.org/jira/browse/KAFKA-9752
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isNew) {
>   // New members are expired after the static join timeout
>   latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
> } else if (isAwaitingJoin || isAwaitingSync) {
>   // Don't remove members as long as they have a request in purgatory
>   true
> } else {
>   // Otherwise check for session expiration
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
> }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)