[GitHub] [kafka] chia7712 commented on a change in pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-13 Thread GitBox


chia7712 commented on a change in pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#discussion_r612974713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -153,9 +153,9 @@ public void shouldNotCreateTopicsWithEmptyInput() throws 
Exception {
 
 @Test
 public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {
-final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final AdminClient admin = EasyMock.createMock(AdminClient.class);

Review comment:
   @cadonna thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-13 Thread GitBox


cadonna commented on a change in pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#discussion_r612972443



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -153,9 +153,9 @@ public void shouldNotCreateTopicsWithEmptyInput() throws 
Exception {
 
 @Test
 public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {
-final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final AdminClient admin = EasyMock.createMock(AdminClient.class);

Review comment:
   See https://github.com/apache/kafka/pull/10529#discussion_r612291115




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-04-13 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320733#comment-17320733
 ] 

Luke Chen commented on KAFKA-12598:
---

I saw the KIP-555 (KAFKA-9397) will address it after removing zookeeper out of 
Kafka. So, close this ticket. Thanks.

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-04-13 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-12598.
---
Fix Version/s: (was: 3.0.0)
   Resolution: Won't Fix

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-13 Thread GitBox


chia7712 commented on a change in pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#discussion_r612970452



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -153,9 +153,9 @@ public void shouldNotCreateTopicsWithEmptyInput() throws 
Exception {
 
 @Test
 public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {
-final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final AdminClient admin = EasyMock.createMock(AdminClient.class);

Review comment:
   Is this a necessary change? If so, should we change all similar code in 
this test class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612969287



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
 configData.remove(new ConfigResource(Type.TOPIC, name));
 }
 
+private boolean getBoolean(ConfigResource configResource, String key) {
+TimelineHashMap map = configData.get(configResource);
+if (map == null) return false;
+String value = map.getOrDefault(key, "false");
+return value.equalsIgnoreCase("true");
+}
+
+/**
+ * Check if the given topic should use an unclean leader election.
+ *
+ * @param topicName The topic name.
+ * @return  True if the controller or topic was configured to 
use unclean
+ *  leader election.
+ */
+boolean shouldUseUncleanLeaderElection(String topicName) {

Review comment:
   Thanks for finding this!  I implemented leader election triggered by 
setting the unclean leader election config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10446: KAFKA-12661 ConfigEntry#equal does not compare other fields when value is NOT null

2021-04-13 Thread GitBox


chia7712 commented on a change in pull request #10446:
URL: https://github.com/apache/kafka/pull/10446#discussion_r612961485



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
##
@@ -149,24 +149,28 @@ public boolean equals(Object o) {
 
 ConfigEntry that = (ConfigEntry) o;
 
-return this.name.equals(that.name) &&
-this.value != null ? this.value.equals(that.value) : 
that.value == null &&

Review comment:
   @ijuma @dajac I filed a Jira for this PR as it includes a bug fix. 
Personally, it should be backport to all active branches. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10400: MINOR: fix package name in integration test

2021-04-13 Thread GitBox


dengziming commented on pull request #10400:
URL: https://github.com/apache/kafka/pull/10400#issuecomment-819250654


   @chia7712 , Thank you for your advice, I have updated the description of 
this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10400: MINOR: fix package name in integration test

2021-04-13 Thread GitBox


chia7712 commented on pull request #10400:
URL: https://github.com/apache/kafka/pull/10400#issuecomment-819248523


   @dengziming thanks for your patch. Could you update description to summarize 
the changes (yours and @rondagostino suggested)? That would be a nice 
information and I can include it in commit message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs

2021-04-13 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320686#comment-17320686
 ] 

Sagar Rao edited comment on KAFKA-12313 at 4/14/21, 5:28 AM:
-

hey [~ableegoldman], i have started working on the KIP 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930] 
and one of the things noticed is that the 2 deprecated configs in StreamsConfig 
are also used in SessionWindowedDeserialiser. The KIP doesn't talk about that 
class though and we didn't discuss it here or in the discuss thread. I believe, 
I will need to include SessionWindowedDeserialiser class as well for my 
changes, is that correct? Should I modify the KIP accordingly in that case?

 

The other part which is missing in the KiP is that the 2 deprecated configs are 
also being used in TimeWindowedSerialiser and SessionWindowedSerialiser. In 
that case, should we add a window.inner.class.serialiser config as well?


was (Author: sagarrao):
hey [~ableegoldman], i have started working on the KIP 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930] 
and one of the things noticed is that the 2 deprecated configs in StreamsConfig 
are also used in SessionWindowedDeserialiser. The KIP doesn't talk about that 
class though and we didn't discuss it here or in the discuss thread. I believe, 
I will need to include SessionWindowedDeserialiser class as well for my 
changes, is that correct? Should I modify the KIP accordingly in that case?

> Consider deprecating the default.windowed.serde.inner.class configs
> ---
>
> Key: KAFKA-12313
> URL: https://issues.apache.org/jira/browse/KAFKA-12313
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> During the discussion of KIP-659 we discussed whether it made sense to have a 
> "default" class for the serdes of windowed inner classes across Streams. 
> Using these configs instead of specifying an actual Serde object can lead to 
> subtle bugs, since the WindowedDeserializer requires a windowSize in addition 
> to the inner class. If the default constructor is invoked, as it will be when 
> falling back on the config, this windowSize defaults to MAX_VALUE. 
> If the downstream program doesn't care about the window end time in the 
> output, then this can go unnoticed and technically there is no problem. But 
> if anything does depend on the end time, or the user just wants to manually 
> read the output for testing purposes, then the MAX_VALUE will result in a 
> garbage timestamp.
> We should consider whether the convenience of specifying a config instead of 
> instantiating a Serde in each operator is really worth the risk of a user 
> accidentally failing to specify a windowSize



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tang7526 commented on pull request #10534: KAFKA-806: Index may not always observe log.index.interval.bytes

2021-04-13 Thread GitBox


tang7526 commented on pull request #10534:
URL: https://github.com/apache/kafka/pull/10534#issuecomment-819237139


   @junrao Could you help to review this PR?  Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng edited a comment on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


vitojeng edited a comment on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819233511


   > Awesome. I can help get this current PR reviewed and merged from here, and 
probably find you someone else to review the next PR(s) since I'm pretty busy 🙂
   
   Great, thanks.
   
> Now, regarding your last question on this PR: am I reading it correctly 
that we just don't ever throw `InvalidStateStoreException` from 
`allMetadataForStore` or `queryMetadataForKey` at the moment, therefore it 
doesn't make sense to throw UnknownStateStoreException from these methods as 
part of this KIP?
   >
   > Personally, I think it's ok to just throw whatever exception makes sense 
from wherever in the code it makes sense to do so. You can send a quick update 
note to the KIP thread to say that you're making this amendment, and if anyone 
has a concern they can respond there.
   
   @ableegoldman I just feel that I may need to point out this(will break the 
API for the new exception).
   Your suggestion is great. I'm happy to update the KIP and update the PR for 
this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng edited a comment on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


vitojeng edited a comment on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819233511


   > Awesome. I can help get this current PR reviewed and merged from here, and 
probably find you someone else to review the next PR(s) since I'm pretty busy 🙂
   
   Great, thanks.
   
> Now, regarding your last question on this PR: am I reading it correctly 
that we just don't ever throw `InvalidStateStoreException` from 
`allMetadataForStore` or `queryMetadataForKey` at the moment, therefore it 
doesn't make sense to throw UnknownStateStoreException from these methods as 
part of this KIP?
   >
   > Personally, I think it's ok to just throw whatever exception makes sense 
from wherever in the code it makes sense to do so. You can send a quick update 
note to the KIP thread to say that you're making this amendment, and if anyone 
has a concern they can respond there.
   
   @ableegoldman I just feel that I may need to point out this.
   Your suggestion is great. I'm happy to update the KIP and update the PR for 
this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10500: MINOR: Move envelop body request serialize code to RequestContext

2021-04-13 Thread GitBox


dengziming commented on pull request #10500:
URL: https://github.com/apache/kafka/pull/10500#issuecomment-819233666


   Resolve conflicts and separate this PR into 2 PRs since some codes are 
unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


vitojeng commented on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819233511


   > Awesome. I can help get this current PR reviewed and merged from here, and 
probably find you someone else to review the next PR(s) since I'm pretty busy 🙂
   
   Great, thanks.
   
> Now, regarding your last question on this PR: am I reading it correctly 
that we just don't ever throw `InvalidStateStoreException` from 
`allMetadataForStore` or `queryMetadataForKey` at the moment, therefore it 
doesn't make sense to throw UnknownStateStoreException from these methods as 
part of this KIP?
   >
   > Personally, I think it's ok to just throw whatever exception makes sense 
from wherever in the code it makes sense to do so. You can send a quick update 
note to the KIP thread to say that you're making this amendment, and if anyone 
has a concern they can respond there.
   
   @ableegoldman I just feel that I may need to point this.
   Your suggestion is great. I'm happy to update the KIP and update the PR for 
this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10535: MINOR: Remove duplicate method in test classes

2021-04-13 Thread GitBox


dengziming opened a new pull request #10535:
URL: https://github.com/apache/kafka/pull/10535


   *More detailed description of your change*
   1. Remove duplicate serializing auto-generated data in 
`RequestConvertToJsonTest`, this is inspired by #9964
   2. Remove `RequestTestUtils.serializeRequestWithHeader` since we added a 
`AbstractRequest.serializeWithHeader` in #10142
   
   *Summary of testing strategy (including rationale)*
   Unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320704#comment-17320704
 ] 

Bruno Cadonna edited comment on KAFKA-9295 at 4/14/21, 5:07 AM:


[~ableegoldman] Makes sense! My bad, I looked at the wrong dates. Thank you for 
double checking!


was (Author: cadonna):
[~ableegoldman] Makes sense! My bad, I looked at the wrong dates. Thank you for 
double check!

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320704#comment-17320704
 ] 

Bruno Cadonna commented on KAFKA-9295:
--

[~ableegoldman] Makes sense! My bad, I looked at the wrong dates. Thank you for 
double check!

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12666) Fix flaky kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic

2021-04-13 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12666:
-

 Summary: Fix flaky 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic
 Key: KAFKA-12666
 URL: https://issues.apache.org/jira/browse/KAFKA-12666
 Project: Kafka
  Issue Type: Test
Reporter: Bruno Cadonna


Found two similar failures of this test on a PR that was unrelated:
{code:java}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, 
deadlineMs=1618341006330, tries=583, nextAllowedTryMs=1618341006437) timed out 
at 1618341006337 after 583 attempt(s)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
 {code}
 
{code:java}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: createTopics
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
 {code}
 

[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10529/4/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed]

 

Might be related to KAFKA-12561.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320699#comment-17320699
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9295:
---

[~cadonna] Hm...github says the commit that kicked off that build was merged 10 
hours ago, and the PR to improve this test was also merged 10 hours ago. But 
based on the line numbers in the stack trace, I think that build did not 
contain the fix -- line 187 in the test used to be verifyKTableKTableJoin(), 
but now is just an empty line. So let's leave the ticket resolved but keep an 
eye out for further failures. 

Still, thanks for reporting this! Fingers crossed that this fix will be 
sufficient and we won't have to dig any further or mess with the session 
interval

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2021-04-13 Thread GitBox


yeralin commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-819226355


   I am! :) Routinely checking this PR.
   
   I totally understand, we all get busy sometimes 👍  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-13 Thread GitBox


cadonna commented on pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#issuecomment-819226061


   3 unrelated test failures.
   
   ```
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320696#comment-17320696
 ] 

Bruno Cadonna commented on KAFKA-9295:
--

Documenting that I found this test failure in an unrelated PR build after this 
ticket was resolved.

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10529/4/testReport/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/Build___JDK_15_and_Scala_2_13___shouldInnerJoinMultiPartitionQueryable_2/
{code:java}
java.lang.AssertionError: Did not receive all 1 records from topic output- 
within 6 ms,  currently accumulated data is []
Expected: is a value equal to or greater than <1>
 but: <0> was less than <1>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:610)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:606)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:579)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:203)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:187){code}

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #10400: MINOR: fix package name in integration test

2021-04-13 Thread GitBox


dengziming commented on pull request #10400:
URL: https://github.com/apache/kafka/pull/10400#issuecomment-819221716


   @chia7712 ,Hello, PTAL, thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming removed a comment on pull request #10400: MINOR: fix package name in integration test

2021-04-13 Thread GitBox


dengziming removed a comment on pull request #10400:
URL: https://github.com/apache/kafka/pull/10400#issuecomment-810705993


   cc @mumrah since `IntegrationTestUtils` is created by you. 😉


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8941) Add RocksDB Metrics that Could not be Added due to RocksDB Version

2021-04-13 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-8941:
-
Labels: newbie  (was: )

> Add RocksDB Metrics that Could not be Added due to RocksDB Version
> --
>
> Key: KAFKA-8941
> URL: https://issues.apache.org/jira/browse/KAFKA-8941
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: newbie
>
> So far not all RocksDB metrics specified in KIP-471 could be added because 
> the RocksDB version currently used in Streams does not support some required 
> features. Once the RocksDB version is increased, the remaining metrics can be 
> added.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8941) Add RocksDB Metrics that Could not be Added due to RocksDB Version

2021-04-13 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-8941:


Assignee: (was: Bruno Cadonna)

> Add RocksDB Metrics that Could not be Added due to RocksDB Version
> --
>
> Key: KAFKA-8941
> URL: https://issues.apache.org/jira/browse/KAFKA-8941
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> So far not all RocksDB metrics specified in KIP-471 could be added because 
> the RocksDB version currently used in Streams does not support some required 
> features. Once the RocksDB version is increased, the remaining metrics can be 
> added.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs

2021-04-13 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320686#comment-17320686
 ] 

Sagar Rao commented on KAFKA-12313:
---

hey [~ableegoldman], i have started working on the KIP 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930] 
and one of the things noticed is that the 2 deprecated configs in StreamsConfig 
are also used in SessionWindowedDeserialiser. The KIP doesn't talk about that 
class though and we didn't discuss it here or in the discuss thread. I believe, 
I will need to include SessionWindowedDeserialiser class as well for my 
changes, is that correct? Should I modify the KIP accordingly in that case?

> Consider deprecating the default.windowed.serde.inner.class configs
> ---
>
> Key: KAFKA-12313
> URL: https://issues.apache.org/jira/browse/KAFKA-12313
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> During the discussion of KIP-659 we discussed whether it made sense to have a 
> "default" class for the serdes of windowed inner classes across Streams. 
> Using these configs instead of specifying an actual Serde object can lead to 
> subtle bugs, since the WindowedDeserializer requires a windowSize in addition 
> to the inner class. If the default constructor is invoked, as it will be when 
> falling back on the config, this windowSize defaults to MAX_VALUE. 
> If the downstream program doesn't care about the window end time in the 
> output, then this can go unnoticed and technically there is no problem. But 
> if anything does depend on the end time, or the user just wants to manually 
> read the output for testing purposes, then the MAX_VALUE will result in a 
> garbage timestamp.
> We should consider whether the convenience of specifying a config instead of 
> instantiating a Serde in each operator is really worth the risk of a user 
> accidentally failing to specify a windowSize



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-04-13 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-8295:


Assignee: Sagar Rao

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-13 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-819214937


   @ableegoldman , haha, no problem. I thought you ignore it. Take your time 
and thank you a lot. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 opened a new pull request #10534: KAFKA-806: Index may not always observe log.index.interval.bytes

2021-04-13 Thread GitBox


tang7526 opened a new pull request #10534:
URL: https://github.com/apache/kafka/pull/10534


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-806) Index may not always observe log.index.interval.bytes

2021-04-13 Thread Chun-Hao Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chun-Hao Tang reassigned KAFKA-806:
---

Assignee: Chun-Hao Tang

> Index may not always observe log.index.interval.bytes
> -
>
> Key: KAFKA-806
> URL: https://issues.apache.org/jira/browse/KAFKA-806
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Jun Rao
>Assignee: Chun-Hao Tang
>Priority: Major
>  Labels: newbie++
>
> Currently, each log.append() will add at most 1 index entry, even when the 
> appended data is larger than log.index.interval.bytes. One potential issue is 
> that if a follower restarts after being down for a long time, it may fetch 
> data much bigger than log.index.interval.bytes at a time. This means that 
> fewer index entries are created, which can increase the fetch time from the 
> consumers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-13 Thread GitBox


ableegoldman commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-819208838


   Hey @showuon , I'm pretty swamped at the moment but don't worry, it's on my 
list 🙂  Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


ableegoldman edited a comment on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819206522


   Awesome. I can help get this current PR reviewed and merged from here, and 
probably find you someone else to review the next PR(s) since I'm pretty busy 🙂
   
   Now, regarding your last question on this PR: am I reading it correctly that 
we just don't ever throw `InvalidStateStoreException` from 
`allMetadataForStore` or `queryMetadataForKey` at the moment, therefore it 
doesn't make sense to throw UnknownStateStoreException from these methods as 
part of this KIP?
   
   Personally, I think it's ok to just throw whatever exception makes sense 
from wherever in the code it makes sense to do so. You can send a quick update 
note to the KIP thread to say that you're making this amendment, and if anyone 
has a concern they can respond there.
   
   For this specific case: it certainly seems to make sense that 
`allMetadataForStore` would throw an UnknownStateStoreException if a store name 
was passed in that's, well, unknown. I do see that we don't currently throw 
anything like that, and it looks like we would just return nothing if an 
invalid store was passed in, but that was probably just an oversight -- I don't 
see why it should behave any differently from `KafkaStreams#store`. Same goes 
for `queryMetadataForKey`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


ableegoldman commented on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819206522


   Awesome. I can help get this current PR reviewed and merged from here, and 
find you someone else to review the next PR(s) since I'm pretty busy 🙂 
   
   Now, regarding your last question on this PR: am I reading it correctly that 
we just don't ever throw `InvalidStateStoreException` from 
`allMetadataForStore` or `queryMetadataForKey` at the moment, therefore it 
doesn't make sense to throw UnknownStateStoreException from these methods as 
part of this KIP?
   
   Personally, I think it's ok to just throw whatever exception makes sense 
from wherever in the code it makes sense to do so. You can send a quick update 
note to the KIP thread to say that you're making this amendment, and if anyone 
has a concern they can respond there.
   
   For this specific case: it certainly seems to make sense that 
`allMetadataForStore` would throw an UnknownStateStoreException if a store name 
was passed in that's, well, unknown. I do see that we don't currently throw 
anything like that, and it looks like we would just return nothing if an 
invalid store was passed in, but that was probably just an oversight -- I don't 
see why it should behave any differently from `KafkaStreams#store`. Same goes 
for `queryMetadataForKey`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8714) CLOSE_WAIT connections piling up on the broker

2021-04-13 Thread GeoffreyStark (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320671#comment-17320671
 ] 

GeoffreyStark commented on KAFKA-8714:
--

maybe the same issue I created  

https://issues.apache.org/jira/browse/KAFKA-12665[https://issues.apache.org/jira/browse/KAFKA-12665]

> CLOSE_WAIT connections piling up on the broker
> --
>
> Key: KAFKA-8714
> URL: https://issues.apache.org/jira/browse/KAFKA-8714
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 2.3.0
> Environment: Linux 4.4.0-139-generic #165-Ubuntu SMP Wed Oct 24 
> 10:58:50 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Rajdeep Mukherjee
>Priority: Major
> Attachments: Screenshot from 2019-07-25 11-53-24.png, 
> consumer_multiprocessing.py, producer_multiprocessing.py
>
>
> We are experiencing an issue where `CLOSE_WAIT` connections are piling up in 
> the brokers leading to a `Too many open files` error finally leading to a 
> crash of the corresponding broker. After some digging, we realized that this 
> is happening at instances when multiple clients(producers or consumers) are 
> closing their connections within a brief interval of time(when the frequency 
> of client connection closes is increasing). 
> The actual error that we had encountered was:
> {code:java}
> [2019-07-18 00:03:27,861] ERROR Error while accepting connection
> (kafka.network.Acceptor) java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) 
> at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) 
> at kafka.network.Acceptor.accept(SocketServer.scala:326)
> at kafka.network.Acceptor.run(SocketServer.scala:269)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> When the error was encountered, the number of CLOSE_WAIT connections on the 
> broker was 200,000 and the number of ESTABLISHED connections was 
> approximately 15000.
> The attachment shows the issue, the sharp dip in the graph is the point where 
> the broker restarted.
> We had encountered this problem in both kafka version 0.10.1 and 2.3.0
> The client version we were using for reproducing was:
>  
> {code:java}
> confluent-kafka==1.1.0
> librdkafka v1.1.0
> {code}
>  
> Steps to reproduce:
> I have attached the scripts we used for reproducing the issue. 
> In our qa environment we were successfully able to reproduce the issue in the 
> following way:
>  * we spun a 5 node kafka v2.3.0 cluster
>  * we had prepared a python script that would spin in the order of 500+ 
> producer processes and the same number of consumer processes and we had 
> written in logic to randomly close the producer and consumer connections at a 
> high frequency in the order of 10 closes per second for 5 minutes.
>  * On the broker side, we were watching for CLOSE_WAIT connections using 
> `lsof` and we got sustained CLOSE_WAIT connections that persisted until we 
> restarted kafka on the corresponding broker.
> The command to be run for the producer and consumer scripts are:
> {code:java}
> python producer_multiprocessing.pytrue true
> python consumer_multiprocessing.py   0 
> true
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12665) one of brokers which is also controller has too much CLOSE_WAITE

2021-04-13 Thread GeoffreyStark (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320670#comment-17320670
 ] 

GeoffreyStark commented on KAFKA-12665:
---

the same issue 

[KAFKA-8714|https://issues.apache.org/jira/browse/KAFKA-8714#]

> one of brokers which is also controller has too much CLOSE_WAITE
> 
>
> Key: KAFKA-12665
> URL: https://issues.apache.org/jira/browse/KAFKA-12665
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, controller, core
>Affects Versions: 0.11.0.1
>Reporter: GeoffreyStark
>Priority: Major
> Attachments: image-2021-04-14-10-32-54-140.png, 
> image-2021-04-14-10-39-02-996.png, image-2021-04-14-11-26-03-346.png
>
>
> # *enviroment*
> apache- 0.11.0.1
> 5 nodes
> 3 replicator
> mean message per sec : 4k
> Prometheus & jmxProt & grafana
> cosumer : spring boot& Doris routineLoad
> producer: spring boo& Log 
>  
> # *encounter with*
>  we encounter with a broker (id : 4)which is also controller (epoch 90)having 
> much CLOSE_WAITE  at a time 
> controller.log
>  
> {code:java}
> Controller 4 epoch 90 fails to send request (type: UpdateMetadataRequest ...
> java.io.IOException: Connection to 4 was disconnected before the response was 
> read
> {code}
>  
> !image-2021-04-14-10-32-54-140.png!
> It will be retried many, many times, but the WARNING will not change
>  
> At the same time
> another broker 6  fetching message from the broker 4 also encounter with the 
> problem
> {code:java}
> [2021-04-13 16:35:06,942] WARN [ReplicaFetcherThread-0-4]: Error in fetch to 
> broker 4, request (type=FetchRequest, replicaId=6, maxWait=500, minBytes=1, 
> maxBytes=10485760,
> java.io.IOException: Connection to 4 was disconnected before the response was 
> read
> {code}
> !image-2021-04-14-10-39-02-996.png!
>  
> doris routineLoad(consume from kafka) time out
>  
> {code:java}
> 2021-04-13 16:35:11,397 WARN (Routine load scheduler|42) 
> [KafkaUtil.getAllKafkaPartitions():91] failed to get partitions. 
> org.apache.doris.common.UserException: errCode = 2, detailMessage = failed to 
> get kafka partition info: [failed to get partition meta: Local: Timed out]
> {code}
>  
>  
> broker 4( controller 90) fs.file
> !image-2021-04-14-11-26-03-346.png!
> Most of the CLOSE_WAITE is generated by the consumer application
> At 16:49, the broker was restarted and returned to normal
>  
>  
> *# speculation*
> The TCP connection is closed passively, but the processing of the Controller 
> Broker machine is not responding
> Are there any bugs in this version?
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vitojeng edited a comment on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


vitojeng edited a comment on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819199915


   > Hey @mjsax and @vitojeng , are you still interested in this? I haven't 
been following this KIP myself but I think the IQ exceptions are a valuable 
addition to Streams, and it would be nice to have this KIP finally pushed 
across the finish line 🙂
   
   @ableegoldman Yes. Personally, I hope to complete this KIP's work. 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-13 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-819203341


   @ableegoldman , could you help check this PR? Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12665) one of brokers which is also controller has too much CLOSE_WAITE

2021-04-13 Thread GeoffreyStark (Jira)
GeoffreyStark created KAFKA-12665:
-

 Summary: one of brokers which is also controller has too much 
CLOSE_WAITE
 Key: KAFKA-12665
 URL: https://issues.apache.org/jira/browse/KAFKA-12665
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, controller, core
Affects Versions: 0.11.0.1
Reporter: GeoffreyStark
 Attachments: image-2021-04-14-10-32-54-140.png, 
image-2021-04-14-10-39-02-996.png, image-2021-04-14-11-26-03-346.png

# *enviroment*

apache- 0.11.0.1

5 nodes

3 replicator

mean message per sec : 4k

Prometheus & jmxProt & grafana

cosumer : spring boot& Doris routineLoad

producer: spring boo& Log 

 

# *encounter with*

 we encounter with a broker (id : 4)which is also controller (epoch 90)having 
much CLOSE_WAITE  at a time 

controller.log

 
{code:java}
Controller 4 epoch 90 fails to send request (type: UpdateMetadataRequest ...
java.io.IOException: Connection to 4 was disconnected before the response was 
read
{code}
 

!image-2021-04-14-10-32-54-140.png!

It will be retried many, many times, but the WARNING will not change

 

At the same time

another broker 6  fetching message from the broker 4 also encounter with the 
problem
{code:java}
[2021-04-13 16:35:06,942] WARN [ReplicaFetcherThread-0-4]: Error in fetch to 
broker 4, request (type=FetchRequest, replicaId=6, maxWait=500, minBytes=1, 
maxBytes=10485760,
java.io.IOException: Connection to 4 was disconnected before the response was 
read
{code}
!image-2021-04-14-10-39-02-996.png!

 

doris routineLoad(consume from kafka) time out

 
{code:java}
2021-04-13 16:35:11,397 WARN (Routine load scheduler|42) 
[KafkaUtil.getAllKafkaPartitions():91] failed to get partitions. 
org.apache.doris.common.UserException: errCode = 2, detailMessage = failed to 
get kafka partition info: [failed to get partition meta: Local: Timed out]
{code}
 

 

broker 4( controller 90) fs.file

!image-2021-04-14-11-26-03-346.png!

Most of the CLOSE_WAITE is generated by the consumer application

At 16:49, the broker was restarted and returned to normal

 

 

*# speculation*

The TCP connection is closed passively, but the processing of the Controller 
Broker machine is not responding

Are there any bugs in this version?

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vitojeng commented on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


vitojeng commented on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819199915


   > Hey @mjsax and @vitojeng , are you still interested in this? I haven't 
been following this KIP myself but I think the IQ exceptions are a valuable 
addition to Streams, and it would be nice to have this KIP finally pushed 
across the finish line 🙂
   
   @ableegoldman Yes. I personally hope to complete this KIP's work. 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #6592: KAFKA-8326: Introduce List Serde

2021-04-13 Thread GitBox


ableegoldman commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-819183273


   Hey @yeralin , are you still working on this/looking for reviews? I know 
everyone is often busy but if you can just respond to let us know you're still 
active, we can try to pitch in to get this across the finish line.
   
   cc @mjsax @cadonna @lct45 @wcarlson5 for help reviewing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2021-04-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-7499:
--
Labels: beginner kip newbie newbie++  (was: beginner kip newbie)

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, kip, newbie, newbie++
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-13 Thread GitBox


ableegoldman commented on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-819180159


   Hey @mjsax and @vitojeng , are you still interested in this? I haven't been 
following this KIP myself but I think the IQ exceptions are a valuable addition 
to Streams, and it would be nice to have this KIP finally pushed across the 
finish line 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-13 Thread GitBox


ableegoldman commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-819179530


   Hey @abbccdda , can you take another look at this? @JoelWee are you still 
interested in getting this KIP done? 
   
   Maybe one of @cadonna @mjsax @lct45 @wcarlson5 can help with the review if so


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12574) Deprecate eos-alpha

2021-04-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12574:
--

Assignee: A. Sophie Blee-Goldman

> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10532: KAFKA-8531: Change default replication factor config

2021-04-13 Thread GitBox


ableegoldman commented on pull request #10532:
URL: https://github.com/apache/kafka/pull/10532#issuecomment-819119565


   Yikes -- no, I think we definitely need to handle that ourselves. I don't 
think users will have any idea what that means -- for one thing they'll wonder 
why they're suddenly seeing it now, and probably report it as a bug if we don't 
explicitly tell them that we changed the default replication factor. For 
another, how are they supposed to know how to interpret `Creating topics with 
default partitions/replication factor are only supported in CreateTopicRequest 
version 4+` -- it doesn't even clearly state that the problem is older brokers, 
much less what version specifically the brokers need to be


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10520: KAFKA-10816: Use root resource as readiness and health probe for Connect distributed mode

2021-04-13 Thread GitBox


kpatelatwork commented on pull request #10520:
URL: https://github.com/apache/kafka/pull/10520#issuecomment-819107528


   > @tombentley that's fair. I was thinking that it's not really a part of the 
contract that `/` be implemented under the hood in a certain way (i.e., as a 
static resource or as one that's routed through the herder's request queue), 
but in order for this change to be useful (and protected against being undone 
in the future), it's important that this change be codified via KIP so that the 
endpoint is both documented and supported as a viable health check for 
distributed workers (and specifically, their herder).
   > 
   > @kpatelatwork I'll address the two scenarios you've outlined separately:
   > 
   > 1. If the probe is called every second, it's unlikely to have a 
significant impact on worker health. If nothing else is queued up (such as a 
rebalance, a different request, session key rotation, etc.), then most of the 
code path that gets traversed will consist of no-ops or trivial operations. I 
ran some local benchmarking and each iteration of the tick thread took less 
than two milliseconds; most of the time that was taken for each request to `GET 
/` was spent outside of the tick thread, but even then, I was able to issue 
about 325 requests/sec on my local machine with some sloppy, unoptimized 
benchmarking*. So if someone's using this endpoint reasonably for 
healthcheck/monitoring purposes and issuing at most one request every second, 
the additional performance hit should be fairly negligible.
   > 2. I can't envision a realistic scenario where this degrades worker health 
by competing with other herder requests, even if hundreds of healthcheck 
requests are issued per second. Yes, they will be handled on the same thread 
that takes care of "real" herder requests, but the actual work done in the 
herder thread to respond to healthcheck requests is tiny--just a lookup of a 
static object. If so many requests come in that a prior tick iteration hasn't 
completed before currently-active ones are in progress, they'll get grouped 
together and handled en masse within a single tick iteration, at which point, 
performance will be extremely high (likely on the order of thousands if not 
millions of iterations per second). The only circumstance I can envision where 
this is remotely possible is if the worker is busy doing other work that takes 
much longer, such as participating in an expensive rebalance, or trying and 
failing to read to the end of the config topic. At that point, if it conti
 nues to receive requests, they'll pile up and things might get out of hand. 
But in order for that to happen, each request will either have to originate 
from a different client, or there will have to be a client out there sending 
out simultaneous requests without waiting for still-active ones to complete. 
This only seems possible with some kind of malicious intent, but since it's 
already possible to lock down access to your worker using REST extensions, we 
probably don't have to worry about DDos protection for something like this. If 
we still want to do some kind of caching, I think one second should be more 
than enough; the longer it gets, the harder we make it harder to detect bad 
worker health when things are going south.
   > 
   > * - `time bash -c "for _ in {0..999}; do curl -sS -o /dev/null 
localhost:8083/ & done && wait"`, if anyone's interested.
   
   Got it, I think for now no need for caching, we can revisit if we encounter 
this issue in the field.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-13 Thread GitBox


guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612827412



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide {
+private final K key;
+private final boolean leftJoin;

Review comment:
   nit: the term `leftJoin` may be a bit confusing as it could mean the 
join type; maybe just call it `leftSide` is better? Ditto for the fields in 
LeftOrRightValue, and also `isLeftJoin` -> `isLeftSide` in the impl join 
classes.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class LeftOrRightValue {
+private final V1 leftValue;
+private final V2 rightValue;
+
+private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+this.leftValue = leftValue;

Review comment:
   I still feel it is better to add a check here, to make sure at most one 
of the left/rightValue is not null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10532: KAFKA-8531: Change default replication factor config

2021-04-13 Thread GitBox


mjsax commented on pull request #10532:
URL: https://github.com/apache/kafka/pull/10532#issuecomment-819103401


   The logs show the following error:
   ```
   [2021-04-13 15:52:17,685] ERROR stream-thread [main] Unexpected error during 
topic creation for 
streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition.
   Error message was: 
org.apache.kafka.common.errors.UnsupportedVersionException: Creating topics 
with default partitions/replication factor are only supported in 
CreateTopicRequest version 4+. The following topics need values for partitions 
and replicas: 
[streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition] 
(org.apache.kafka.streams.processor.internals.InternalTopicManager:446)
   [2021-04-13 15:52:17,687] ERROR stream-client 
[streams-wordcount-4b9ea556-bc49-4671-83e8-4d2adaba3677] Encountered the 
following exception during processing and the registered exception handler 
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  
(org.apache.kafka.streams.KafkaStreams:484)
   org.apache.kafka.streams.errors.StreamsException: Could not create topic 
streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition.
   ...
   Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: 
Creating topics with default partitions/replication factor are only supported 
in CreateTopicRequest version 4+. The following topics need values for 
partitions and replicas: 
[streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-03-repartition]
   
   ```
   
   Do we think this would be sufficient? Or should we try to handle this 
specific `UnsupportedVersionException` explicitly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-13 Thread GitBox


kpatelatwork commented on a change in pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#discussion_r612800672



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -369,9 +370,31 @@ else if (serverConnector != null && 
serverConnector.getHost() != null && serverC
 else if (serverConnector != null && serverConnector.getPort() > 0)
 builder.port(serverConnector.getPort());
 
-log.info("Advertised URI: {}", builder.build());
+URI uri = builder.build();
+validateUriHost(uri);
+log.info("Advertised URI: {}", uri);
 
-return builder.build();
+return uri;
+}
+
+/**
+ * Parses the uri and throws a more definitive error
+ * when the internal node to node communication can't happen due to an 
invalid host name.
+ */
+private void validateUriHost(URI uri) {
+if (uri.getHost() == null) {
+String host = Utils.getHost(uri.getAuthority());
+String errorMsg = "Invalid host=" + host + ", in url=" + 
uri.toString();

Review comment:
   The new check in the code happens only when uri.getHost is null so its 
not worse than what we have in trunk but I do agree Apache HttpClient may fix 
this. I had thought about using Apache HttpClient but isn't this a big library 
change so do we want to do that in a separate ticket as I wasn't sure about out 
library rules as I didn't found Apache HttpClient being used anywhere in our 
code except in 1 test case.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -369,9 +370,31 @@ else if (serverConnector != null && 
serverConnector.getHost() != null && serverC
 else if (serverConnector != null && serverConnector.getPort() > 0)
 builder.port(serverConnector.getPort());
 
-log.info("Advertised URI: {}", builder.build());
+URI uri = builder.build();
+validateUriHost(uri);
+log.info("Advertised URI: {}", uri);
 
-return builder.build();
+return uri;
+}
+
+/**
+ * Parses the uri and throws a more definitive error
+ * when the internal node to node communication can't happen due to an 
invalid host name.
+ */
+private void validateUriHost(URI uri) {
+if (uri.getHost() == null) {
+String host = Utils.getHost(uri.getAuthority());
+String errorMsg = "Invalid host=" + host + ", in url=" + 
uri.toString();

Review comment:
   @C0urante  The new check in the code happens only when uri.getHost is 
null so its not worse than what we have in trunk but I do agree Apache 
HttpClient may fix this. I had thought about using Apache HttpClient but isn't 
this a big library change so do we want to do that in a separate ticket as I 
wasn't sure about out library rules as I didn't found Apache HttpClient being 
used anywhere in our code except in 1 test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-13 Thread GitBox


kpatelatwork commented on a change in pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#discussion_r612798221



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -369,9 +370,31 @@ else if (serverConnector != null && 
serverConnector.getHost() != null && serverC
 else if (serverConnector != null && serverConnector.getPort() > 0)
 builder.port(serverConnector.getPort());
 
-log.info("Advertised URI: {}", builder.build());
+URI uri = builder.build();
+validateUriHost(uri);
+log.info("Advertised URI: {}", uri);
 
-return builder.build();
+return uri;
+}
+
+/**
+ * Parses the uri and throws a more definitive error
+ * when the internal node to node communication can't happen due to an 
invalid host name.
+ */
+private void validateUriHost(URI uri) {
+if (uri.getHost() == null) {
+String host = Utils.getHost(uri.getAuthority());
+String errorMsg = "Invalid host=" + host + ", in url=" + 
uri.toString();

Review comment:
   Fixed the error message as per your suggestions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612786365



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
 configData.remove(new ConfigResource(Type.TOPIC, name));
 }
 
+private boolean getBoolean(ConfigResource configResource, String key) {
+TimelineHashMap map = configData.get(configResource);
+if (map == null) return false;
+String value = map.getOrDefault(key, "false");
+return value.equalsIgnoreCase("true");
+}
+
+/**
+ * Check if the given topic should use an unclean leader election.
+ *
+ * @param topicName The topic name.
+ * @return  True if the controller or topic was configured to 
use unclean
+ *  leader election.
+ */
+boolean shouldUseUncleanLeaderElection(String topicName) {
+// Check the node config, cluster config, and topic config.
+return getBoolean(currentNodeResource, 
UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
   Hmm.  So an individual topic could configure unclean leader election as 
false even if the node sets it to true?  I will implement that, then...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612795358



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -832,13 +833,33 @@ void handleNodeActivated(int brokerId, 
List records) {
 throw new RuntimeException("Partition " + topicIdPartition +
 " existed in isrMembers, but not in the partitions map.");
 }
-// TODO: if this partition is configured for unclean leader 
election,
-// check the replica set rather than the ISR.
-if (Replicas.contains(partition.isr, brokerId)) {
-records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+boolean brokerInIsr = Replicas.contains(partition.isr, brokerId);
+boolean shouldBecomeLeader;
+if 
(configurationControl.shouldUseUncleanLeaderElection(topic.name)) {
+shouldBecomeLeader = Replicas.contains(partition.replicas, 
brokerId);
+} else {
+shouldBecomeLeader = brokerInIsr;
+}
+if (shouldBecomeLeader) {
+if (brokerInIsr) {
+if (log.isDebugEnabled()) {
+log.debug("The newly active node {} will be the leader 
for the " +
+"previously offline partition {}.",
+brokerId, topicIdPartition);
+}
+} else {
+log.info("The newly active node {} will be the leader for 
the " +
+"previously offline partition {}, after an UNCLEAN 
leader election.",
+brokerId, topicIdPartition);
+}
+PartitionChangeRecord record = new PartitionChangeRecord().
 setPartitionId(topicIdPartition.partitionId()).
 setTopicId(topic.id).
-setLeader(brokerId), (short) 0));
+setLeader(brokerId);
+if (!brokerInIsr) {
+record.setIsr(Replicas.toList(partition.isr, brokerId));

Review comment:
   Good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

2021-04-13 Thread GitBox


junrao commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612792056



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
 configData.remove(new ConfigResource(Type.TOPIC, name));
 }
 
+private boolean getBoolean(ConfigResource configResource, String key) {
+TimelineHashMap map = configData.get(configResource);
+if (map == null) return false;
+String value = map.getOrDefault(key, "false");
+return value.equalsIgnoreCase("true");
+}
+
+/**
+ * Check if the given topic should use an unclean leader election.
+ *
+ * @param topicName The topic name.
+ * @return  True if the controller or topic was configured to 
use unclean
+ *  leader election.
+ */
+boolean shouldUseUncleanLeaderElection(String topicName) {
+// Check the node config, cluster config, and topic config.
+return getBoolean(currentNodeResource, 
UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
   Right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612786365



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
 configData.remove(new ConfigResource(Type.TOPIC, name));
 }
 
+private boolean getBoolean(ConfigResource configResource, String key) {
+TimelineHashMap map = configData.get(configResource);
+if (map == null) return false;
+String value = map.getOrDefault(key, "false");
+return value.equalsIgnoreCase("true");
+}
+
+/**
+ * Check if the given topic should use an unclean leader election.
+ *
+ * @param topicName The topic name.
+ * @return  True if the controller or topic was configured to 
use unclean
+ *  leader election.
+ */
+boolean shouldUseUncleanLeaderElection(String topicName) {
+// Check the node config, cluster config, and topic config.
+return getBoolean(currentNodeResource, 
UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
   So an individual topic could configure unclean leader election as false 
even if the node sets it to true?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

2021-04-13 Thread GitBox


junrao commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612769595



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -832,13 +833,33 @@ void handleNodeActivated(int brokerId, 
List records) {
 throw new RuntimeException("Partition " + topicIdPartition +
 " existed in isrMembers, but not in the partitions map.");
 }
-// TODO: if this partition is configured for unclean leader 
election,
-// check the replica set rather than the ISR.
-if (Replicas.contains(partition.isr, brokerId)) {
-records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+boolean brokerInIsr = Replicas.contains(partition.isr, brokerId);
+boolean shouldBecomeLeader;
+if 
(configurationControl.shouldUseUncleanLeaderElection(topic.name)) {
+shouldBecomeLeader = Replicas.contains(partition.replicas, 
brokerId);
+} else {
+shouldBecomeLeader = brokerInIsr;
+}
+if (shouldBecomeLeader) {
+if (brokerInIsr) {
+if (log.isDebugEnabled()) {
+log.debug("The newly active node {} will be the leader 
for the " +
+"previously offline partition {}.",
+brokerId, topicIdPartition);
+}
+} else {
+log.info("The newly active node {} will be the leader for 
the " +
+"previously offline partition {}, after an UNCLEAN 
leader election.",
+brokerId, topicIdPartition);
+}
+PartitionChangeRecord record = new PartitionChangeRecord().
 setPartitionId(topicIdPartition.partitionId()).
 setTopicId(topic.id).
-setLeader(brokerId), (short) 0));
+setLeader(brokerId);
+if (!brokerInIsr) {
+record.setIsr(Replicas.toList(partition.isr, brokerId));

Review comment:
   Hmm, if we performs an unclean leader election, the only replica in ISR 
should just be the new leader since the data in existing ISR is not guaranteed 
to match with the new leader.

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
##
@@ -245,4 +246,32 @@ public void testLegacyAlterConfigs() {
 manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", 
"901")
 );
 }
+
+@Test
+public void testShouldUseUncleanLeaderElection() throws Exception {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+ConfigurationControlManager manager =
+new ConfigurationControlManager(new LogContext(), 0, 
snapshotRegistry, CONFIGS);
+ControllerResult> result = 
manager.incrementalAlterConfigs(
+toMap(entry(BROKER0, toMap(
+entry(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, entry(SET, 
"true",
+entry(MYTOPIC, toMap(
+entry(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, entry(SET, 
"false"));
+Map expectedResponse = new HashMap<>();
+expectedResponse.put(BROKER0, ApiError.NONE);
+expectedResponse.put(MYTOPIC, ApiError.NONE);
+assertEquals(expectedResponse, result.response());
+ControllerTestUtils.replayAll(manager, result.records());
+assertTrue(manager.shouldUseUncleanLeaderElection(MYTOPIC.name()));

Review comment:
   I think in this case, unclean leader election should be false for 
MYTOPIC since the topic level config takes precedence. 

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
 configData.remove(new ConfigResource(Type.TOPIC, name));
 }
 
+private boolean getBoolean(ConfigResource configResource, String key) {
+TimelineHashMap map = configData.get(configResource);
+if (map == null) return false;
+String value = map.getOrDefault(key, "false");
+return value.equalsIgnoreCase("true");
+}
+
+/**
+ * Check if the given topic should use an unclean leader election.
+ *
+ * @param topicName The topic name.
+ * @return  True if the controller or topic was configured to 
use unclean
+ *  leader election.
+ */
+boolean shouldUseUncleanLeaderElection(String topicName) {
+// Check the node config, cluster config, and topic config.
+return getBoolean(currentNodeResource, 
UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
   Hmm, the priority of a config is topic, node, node d

[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-13 Thread GitBox


guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612773044



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {

Review comment:
   Ah nvm then --- let's just keep it out of the scope of this ticket for 
now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10543) Convert KTable joins to new PAPI

2021-04-13 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-10543:


Assignee: Jorge Esteban Quilcate Otoya  (was: John Roesler)

> Convert KTable joins to new PAPI
> 
>
> Key: KAFKA-10543
> URL: https://issues.apache.org/jira/browse/KAFKA-10543
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10542) Convert KTable maps to new PAPI

2021-04-13 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-10542:


Assignee: Jorge Esteban Quilcate Otoya  (was: John Roesler)

> Convert KTable maps to new PAPI
> ---
>
> Key: KAFKA-10542
> URL: https://issues.apache.org/jira/browse/KAFKA-10542
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10540) Convert KStream aggregations to new PAPI

2021-04-13 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-10540:


Assignee: Jorge Esteban Quilcate Otoya  (was: John Roesler)

> Convert KStream aggregations to new PAPI
> 
>
> Key: KAFKA-10540
> URL: https://issues.apache.org/jira/browse/KAFKA-10540
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10541) Convert KTable filters to new PAPI

2021-04-13 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-10541:


Assignee: Jorge Esteban Quilcate Otoya  (was: John Roesler)

> Convert KTable filters to new PAPI
> --
>
> Key: KAFKA-10541
> URL: https://issues.apache.org/jira/browse/KAFKA-10541
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10544) Convert KTable aggregations to new PAPI

2021-04-13 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-10544:


Assignee: Jorge Esteban Quilcate Otoya  (was: John Roesler)

> Convert KTable aggregations to new PAPI
> ---
>
> Key: KAFKA-10544
> URL: https://issues.apache.org/jira/browse/KAFKA-10544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10539) Convert KStreamImpl joins to new PAPI

2021-04-13 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-10539:


Assignee: Jorge Esteban Quilcate Otoya  (was: John Roesler)

> Convert KStreamImpl joins to new PAPI
> -
>
> Key: KAFKA-10539
> URL: https://issues.apache.org/jira/browse/KAFKA-10539
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12608) Simple identity pipeline sometimes loses data

2021-04-13 Thread Hans-Peter Grahsl (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320522#comment-17320522
 ] 

Hans-Peter Grahsl commented on KAFKA-12608:
---

THX [~jamii] for reporting back that it's fine now. Glad I could help you find 
the issue.

> Simple identity pipeline sometimes loses data
> -
>
> Key: KAFKA-12608
> URL: https://issues.apache.org/jira/browse/KAFKA-12608
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos 
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running a very simple streams program that reads records from one topic 
> into a table and then writes the stream back into another topic. In about 1 
> in 5 runs, some of the output records are missing. They tend to form a single 
> contiguous range, as if a single batch was dropped somewhere.
> https://github.com/jamii/streaming-consistency/blob/main/kafka-streams/src/main/java/Demo.java#L49-L52
> {code:bash}
> $ wc -l tmp/*transactions
>  999514 tmp/accepted_transactions
>  100 tmp/transactions
>  1999514 total
> $ cat tmp/transactions | cut -d',' -f 1 | cut -d' ' -f 2 > in
> $ cat tmp/accepted_transactions | cut -d',' -f 1 | cut -d':' -f 2 > out
> $ diff in out | wc -l
>  487
> $ diff in out | head
>  25313,25798d25312
>  < 25312
>  < 25313
>  < 25314
>  < 25315
>  < 25316
>  < 25317
>  < 25318
>  < 25319
>  < 25320
>  
> $ diff in out | tail
>  < 25788
>  < 25789
>  < 25790
>  < 25791
>  < 25792
>  < 25793
>  < 25794
>  < 25795
>  < 25796
>  < 25797
> {code}
> I've checked running the consumer multiple times to make sure that the 
> records are actually missing from the topic and it wasn't just a hiccup in 
> the consumer. 
> The repo linked above has instructions in the readme on how to reproduce the 
> exact versions used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling

2021-04-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7728.

Resolution: Won't Fix

This is no longer a critical fix to the static membership, as leader rejoin 
with no member info won't trigger rebalance in the original implementation.

> Add JoinReason to the join group request for better rebalance handling
> --
>
> Key: KAFKA-7728
> URL: https://issues.apache.org/jira/browse/KAFKA-7728
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: consumer, mirror-maker, needs-kip
>
> Recently [~mgharat] and I discussed about the current rebalance logic on 
> leader join group request handling. So far we blindly trigger rebalance when 
> the leader rejoins. The caveat is that KIP-345 is not covering this effort 
> and if a consumer group is not using sticky assignment but using other 
> strategy like round robin, the redundant rebalance could still shuffle the 
> topic partitions around consumers. (for example mirror maker application)
> I checked on broker side and here is what we currently do:
>  
> {code:java}
> if (group.isLeader(memberId) || !member.matches(protocols))  
> // force a rebalance if a member has changed metadata or if the leader sends 
> JoinGroup. 
> // The latter allows the leader to trigger rebalances for changes affecting 
> assignment 
> // which do not affect the member metadata (such as topic metadata changes 
> for the consumer) {code}
> Based on the broker logic, we only need to trigger rebalance for leader 
> rejoin when the topic metadata change has happened. I also looked up the 
> ConsumerCoordinator code on client side, and found out the metadata 
> monitoring logic here:
> {code:java}
> public boolean rejoinNeededOrPending() {
> ...
> // we need to rejoin if we performed the assignment and metadata has changed
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.equals(metadataSnapshot))
>   return true;
> }{code}
>  I guess instead of just returning true, we could introduce a new enum field 
> called JoinReason which could indicate the purpose of the rejoin. Thus we 
> don't need to do a full rebalance when the leader is just in rolling bounce.
> We could utilize this information I guess. Just add another enum field into 
> the join group request called JoinReason so that we know whether leader is 
> rejoining due to topic metadata change. If yes, we trigger rebalance 
> obviously; if no, we shouldn't trigger rebalance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612742924



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -517,25 +540,67 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
 val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
-authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
-val configChanges = new util.HashMap[ConfigResource, util.Map[String, 
util.Map.Entry[AlterConfigOp.OpType, String]]]()
+val configChanges = new util.HashMap[ConfigResource,
+  util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
 alterConfigsRequest.data.resources.forEach { resource =>
-  val configResource = new 
ConfigResource(ConfigResource.Type.forId(resource.resourceType), 
resource.resourceName())
-  val altersByName = new util.HashMap[String, 
util.Map.Entry[AlterConfigOp.OpType, String]]()
+  val configResource = new ConfigResource(
+ConfigResource.Type.forId(resource.resourceType), 
resource.resourceName())
+  val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, 
String]]()
   resource.configs.forEach { config =>
 altersByName.put(config.name, new 
util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
   AlterConfigOp.OpType.forId(config.configOperation), config.value))
   }
   configChanges.put(configResource, altersByName)
 }
+val results = new util.HashMap[ConfigResource, ApiError]
+val iterator = configChanges.keySet().iterator()
+while (iterator.hasNext) {
+  val resource = iterator.next()
+  val apiError = resource.`type` match {
+case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
+  if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
+new ApiError(NONE)
+  } else {
+new ApiError(CLUSTER_AUTHORIZATION_FAILED)

Review comment:
   Yes, we could be authorized to change some things but not others.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612742563



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -238,87 +249,99 @@ class ControllerApis(val requestChannel: RequestChannel,
 val toAuthenticate = new util.HashSet[String]
 toAuthenticate.addAll(providedNames)
 val idToName = new util.HashMap[Uuid, String]
-controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-  if (nameOrError.isError) {
-appendResponse(null, id, nameOrError.error())
-  } else {
-toAuthenticate.add(nameOrError.result())
-idToName.put(id, nameOrError.result())
-  }
-}
-// Get the list of deletable topics (those we can delete) and the list of 
describeable
-// topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-// exist, even when it does.
-val topicsToAuthenticate = toAuthenticate.asScala
-val (describeable, deletable) = if (hasClusterAuth) {
-  (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-} else {
-  (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-}
-// For each topic that was provided by ID, check if authentication failed.
-// If so, remove it from the idToName map and create an error response for 
it.
-val iterator = idToName.entrySet().iterator()
-while (iterator.hasNext) {
-  val entry = iterator.next()
-  val id = entry.getKey
-  val name = entry.getValue
-  if (!deletable.contains(name)) {
-if (describeable.contains(name)) {
-  appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+controller.findTopicNames(providedIds).thenCompose(topicNames => {
+  topicNames.forEach { (id, nameOrError) =>
+if (nameOrError.isError) {
+  appendResponse(null, id, nameOrError.error())
 } else {
-  appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+  toAuthenticate.add(nameOrError.result())
+  idToName.put(id, nameOrError.result())
 }
-iterator.remove()
   }
-}
-// For each topic that was provided by name, check if authentication 
failed.
-// If so, create an error response for it.  Otherwise, add it to the 
idToName map.
-controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
-  if (!describeable.contains(name)) {
-appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
-  } else if (idOrError.isError) {
-appendResponse(name, ZERO_UUID, idOrError.error)
-  } else if (deletable.contains(name)) {
-val id = idOrError.result()
-if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != 
null) {
-  // This is kind of a weird case: what if we supply topic ID X and 
also a name
-  // that maps to ID X?  In that case, _if authorization succeeds_, we 
end up
-  // here.  If authorization doesn't succeed, we refrain from 
commenting on the
-  // situation since it would reveal topic ID mappings.
-  duplicateProvidedIds.add(id)
-  idToName.remove(id)
-  appendResponse(name, id, new ApiError(INVALID_REQUEST,
-"The provided topic name maps to an ID that was already 
supplied."))
-}
+  // Get the list of deletable topics (those we can delete) and the list 
of describeable
+  // topics.
+  val topicsToAuthenticate = toAuthenticate.asScala
+  val (describeable, deletable) = if (hasClusterAuth) {
+(topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
   } else {
-appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+(getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
   }
-}
-// Finally, the idToName map contains all the topics that we are 
authorized to delete.
-// Perform the deletion and create responses for each one.
-val idToError = controller.deleteTopics(idToName.keySet).get()
-idToError.forEach { (id, error) =>
-appendResponse(idToName.get(id), id, error)
-}
-// Shuffle the responses so that users can not use patterns in their 
positions to
-// distinguish between absent topics and topics we are not permitted to 
see.
-Collections.shuffle(responses)
-responses
+  // For each topic that was provided by ID, check if authentication 
failed.
+  // If so, remove it from the idToName map and create an error response 
for it.
+  val iterator = idToName.entrySet().iterator()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+val id = entry.getKey
+val name = entry.getValue
+if (!deletable.contains(name)) {
+  if (describeable.contains(name)) {
+appendResponse(name, id, new ApiError(TOPIC_AUTH

[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612738586



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -35,8 +36,8 @@ import org.apache.kafka.common.internals.FatalExitError
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultCollection}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, 
BrokerHeartbeatResponseData, BrokerRegistrationResponseData, 
CreateTopicsRequestData, CreateTopicsResponseData, DeleteTopicsRequestData, 
DeleteTopicsResponseData, DescribeQuorumResponseData, 
EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, 
SaslAuthenticateResponseData, SaslHandshakeResponseData, 
UnregisterBrokerResponseData, VoteResponseData}
-import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, 
TOPIC_AUTHORIZATION_FAILED}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors._

Review comment:
   I think this makes it more readable.  Long lines make reading and 
merging difficult




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12594) Various simple examples produce no ouput, and prevent other streams from producing output

2021-04-13 Thread Jamie Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320487#comment-17320487
 ] 

Jamie Brandon commented on KAFKA-12594:
---

I've setup the same calculation and data in ksqldb and I get the output I 
expect - https://github.com/jamii/streaming-consistency/tree/main/ksqldb. I'm 
not sure how to find out how the ksqldb docker image is built, in order to 
check if it's using the same versions and configuration for kafka streams.

> Various simple examples produce no ouput, and prevent other streams from 
> producing output
> -
>
> Key: KAFKA-12594
> URL: https://issues.apache.org/jira/browse/KAFKA-12594
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
>Reporter: Jamie Brandon
>Priority: Major
> Attachments: logs.tar.gz, screenshot_2021-04-13_12-10-19.png, 
> screenshot_2021-04-13_12-10-31.png, screenshot_2021-04-13_12-13-04.png
>
>
> I have a simple kafka-streams example which just reads in some transactions 
> and writes them back out again.
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/kafka-streams/src/main/java/Demo.java
> This works fine and I see the right output in 'accepted_transactions'.
> If I uncomment the left join at line 58, then not only do I not get any 
> output for the left join, but I don't get any output in 
> 'accepted_transactions' either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12594) Various simple examples produce no ouput, and prevent other streams from producing output

2021-04-13 Thread Jamie Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320484#comment-17320484
 ] 

Jamie Brandon commented on KAFKA-12594:
---

Same behavior if I modify the timestamps in the data to be in the current hour 
rather than several months in the past.

> Various simple examples produce no ouput, and prevent other streams from 
> producing output
> -
>
> Key: KAFKA-12594
> URL: https://issues.apache.org/jira/browse/KAFKA-12594
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
>Reporter: Jamie Brandon
>Priority: Major
> Attachments: logs.tar.gz, screenshot_2021-04-13_12-10-19.png, 
> screenshot_2021-04-13_12-10-31.png, screenshot_2021-04-13_12-13-04.png
>
>
> I have a simple kafka-streams example which just reads in some transactions 
> and writes them back out again.
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/kafka-streams/src/main/java/Demo.java
> This works fine and I see the right output in 'accepted_transactions'.
> If I uncomment the left join at line 58, then not only do I not get any 
> output for the left join, but I don't get any output in 
> 'accepted_transactions' either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12594) Various simple examples produce no ouput, and prevent other streams from producing output

2021-04-13 Thread Jamie Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320481#comment-17320481
 ] 

Jamie Brandon commented on KAFKA-12594:
---

I verified in wireshark that no data is being sent to the accepted_transactions 
topic:

 !screenshot_2021-04-13_12-10-19.png! 

 !screenshot_2021-04-13_12-10-31.png! 

 !screenshot_2021-04-13_12-13-04.png! 

> Various simple examples produce no ouput, and prevent other streams from 
> producing output
> -
>
> Key: KAFKA-12594
> URL: https://issues.apache.org/jira/browse/KAFKA-12594
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
>Reporter: Jamie Brandon
>Priority: Major
> Attachments: logs.tar.gz, screenshot_2021-04-13_12-10-19.png, 
> screenshot_2021-04-13_12-10-31.png, screenshot_2021-04-13_12-13-04.png
>
>
> I have a simple kafka-streams example which just reads in some transactions 
> and writes them back out again.
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/kafka-streams/src/main/java/Demo.java
> This works fine and I see the right output in 'accepted_transactions'.
> If I uncomment the left join at line 58, then not only do I not get any 
> output for the left join, but I don't get any output in 
> 'accepted_transactions' either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12594) Various simple examples produce no ouput, and prevent other streams from producing output

2021-04-13 Thread Jamie Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Brandon updated KAFKA-12594:
--
Attachment: screenshot_2021-04-13_12-13-04.png

> Various simple examples produce no ouput, and prevent other streams from 
> producing output
> -
>
> Key: KAFKA-12594
> URL: https://issues.apache.org/jira/browse/KAFKA-12594
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
>Reporter: Jamie Brandon
>Priority: Major
> Attachments: logs.tar.gz, screenshot_2021-04-13_12-10-19.png, 
> screenshot_2021-04-13_12-10-31.png, screenshot_2021-04-13_12-13-04.png
>
>
> I have a simple kafka-streams example which just reads in some transactions 
> and writes them back out again.
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/kafka-streams/src/main/java/Demo.java
> This works fine and I see the right output in 'accepted_transactions'.
> If I uncomment the left join at line 58, then not only do I not get any 
> output for the left join, but I don't get any output in 
> 'accepted_transactions' either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12594) Various simple examples produce no ouput, and prevent other streams from producing output

2021-04-13 Thread Jamie Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Brandon updated KAFKA-12594:
--
Attachment: screenshot_2021-04-13_12-10-19.png

> Various simple examples produce no ouput, and prevent other streams from 
> producing output
> -
>
> Key: KAFKA-12594
> URL: https://issues.apache.org/jira/browse/KAFKA-12594
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
>Reporter: Jamie Brandon
>Priority: Major
> Attachments: logs.tar.gz, screenshot_2021-04-13_12-10-19.png, 
> screenshot_2021-04-13_12-10-31.png
>
>
> I have a simple kafka-streams example which just reads in some transactions 
> and writes them back out again.
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/kafka-streams/src/main/java/Demo.java
> This works fine and I see the right output in 'accepted_transactions'.
> If I uncomment the left join at line 58, then not only do I not get any 
> output for the left join, but I don't get any output in 
> 'accepted_transactions' either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12594) Various simple examples produce no ouput, and prevent other streams from producing output

2021-04-13 Thread Jamie Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Brandon updated KAFKA-12594:
--
Attachment: screenshot_2021-04-13_12-10-31.png

> Various simple examples produce no ouput, and prevent other streams from 
> producing output
> -
>
> Key: KAFKA-12594
> URL: https://issues.apache.org/jira/browse/KAFKA-12594
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
>Reporter: Jamie Brandon
>Priority: Major
> Attachments: logs.tar.gz, screenshot_2021-04-13_12-10-19.png, 
> screenshot_2021-04-13_12-10-31.png
>
>
> I have a simple kafka-streams example which just reads in some transactions 
> and writes them back out again.
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/kafka-streams/src/main/java/Demo.java
> This works fine and I see the right output in 'accepted_transactions'.
> If I uncomment the left join at line 58, then not only do I not get any 
> output for the left join, but I don't get any output in 
> 'accepted_transactions' either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-04-13 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320471#comment-17320471
 ] 

Rajini Sivaram commented on KAFKA-12598:


[~cmccabe] [~hachikuji] Have we figured out how/when to remove ZK options for 
commands used to bootstrap brokers?

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12594) Various simple examples produce no ouput, and prevent other streams from producing output

2021-04-13 Thread Jamie Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320464#comment-17320464
 ] 

Jamie Brandon commented on KAFKA-12594:
---

In light of https://issues.apache.org/jira/browse/KAFKA-12608 I tried setting 
`--config retention.ms=-1` here too, but this didn't change the behavior.

> Various simple examples produce no ouput, and prevent other streams from 
> producing output
> -
>
> Key: KAFKA-12594
> URL: https://issues.apache.org/jira/browse/KAFKA-12594
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
>Reporter: Jamie Brandon
>Priority: Major
> Attachments: logs.tar.gz
>
>
> I have a simple kafka-streams example which just reads in some transactions 
> and writes them back out again.
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/kafka-streams/src/main/java/Demo.java
> This works fine and I see the right output in 'accepted_transactions'.
> If I uncomment the left join at line 58, then not only do I not get any 
> output for the left join, but I don't get any output in 
> 'accepted_transactions' either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12608) Simple identity pipeline sometimes loses data

2021-04-13 Thread Jamie Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Brandon resolved KAFKA-12608.
---
Resolution: Invalid

> Simple identity pipeline sometimes loses data
> -
>
> Key: KAFKA-12608
> URL: https://issues.apache.org/jira/browse/KAFKA-12608
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos 
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running a very simple streams program that reads records from one topic 
> into a table and then writes the stream back into another topic. In about 1 
> in 5 runs, some of the output records are missing. They tend to form a single 
> contiguous range, as if a single batch was dropped somewhere.
> https://github.com/jamii/streaming-consistency/blob/main/kafka-streams/src/main/java/Demo.java#L49-L52
> {code:bash}
> $ wc -l tmp/*transactions
>  999514 tmp/accepted_transactions
>  100 tmp/transactions
>  1999514 total
> $ cat tmp/transactions | cut -d',' -f 1 | cut -d' ' -f 2 > in
> $ cat tmp/accepted_transactions | cut -d',' -f 1 | cut -d':' -f 2 > out
> $ diff in out | wc -l
>  487
> $ diff in out | head
>  25313,25798d25312
>  < 25312
>  < 25313
>  < 25314
>  < 25315
>  < 25316
>  < 25317
>  < 25318
>  < 25319
>  < 25320
>  
> $ diff in out | tail
>  < 25788
>  < 25789
>  < 25790
>  < 25791
>  < 25792
>  < 25793
>  < 25794
>  < 25795
>  < 25796
>  < 25797
> {code}
> I've checked running the consumer multiple times to make sure that the 
> records are actually missing from the topic and it wasn't just a hiccup in 
> the consumer. 
> The repo linked above has instructions in the readme on how to reproduce the 
> exact versions used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12608) Simple identity pipeline sometimes loses data

2021-04-13 Thread Jamie Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320463#comment-17320463
 ] 

Jamie Brandon commented on KAFKA-12608:
---

I tried with `--config retention.ms=-1` and haven't been able to reproduce the 
problem, so this seems like the correct explanation. Thanks for your help and 
sorry for the false alarm :S

> Simple identity pipeline sometimes loses data
> -
>
> Key: KAFKA-12608
> URL: https://issues.apache.org/jira/browse/KAFKA-12608
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos 
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running a very simple streams program that reads records from one topic 
> into a table and then writes the stream back into another topic. In about 1 
> in 5 runs, some of the output records are missing. They tend to form a single 
> contiguous range, as if a single batch was dropped somewhere.
> https://github.com/jamii/streaming-consistency/blob/main/kafka-streams/src/main/java/Demo.java#L49-L52
> {code:bash}
> $ wc -l tmp/*transactions
>  999514 tmp/accepted_transactions
>  100 tmp/transactions
>  1999514 total
> $ cat tmp/transactions | cut -d',' -f 1 | cut -d' ' -f 2 > in
> $ cat tmp/accepted_transactions | cut -d',' -f 1 | cut -d':' -f 2 > out
> $ diff in out | wc -l
>  487
> $ diff in out | head
>  25313,25798d25312
>  < 25312
>  < 25313
>  < 25314
>  < 25315
>  < 25316
>  < 25317
>  < 25318
>  < 25319
>  < 25320
>  
> $ diff in out | tail
>  < 25788
>  < 25789
>  < 25790
>  < 25791
>  < 25792
>  < 25793
>  < 25794
>  < 25795
>  < 25796
>  < 25797
> {code}
> I've checked running the consumer multiple times to make sure that the 
> records are actually missing from the topic and it wasn't just a hiccup in 
> the consumer. 
> The repo linked above has instructions in the readme on how to reproduce the 
> exact versions used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-04-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-9295:
--
Fix Version/s: 3.0.0

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest

2021-04-13 Thread GitBox


ableegoldman commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-818970047


   Thanks @showuon , merged to trunk. Let's also close out the ticket for now 
so that if someone sees it fail again on a PR build they'll reopen it and we'll 
be notified


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest

2021-04-13 Thread GitBox


ableegoldman merged pull request #10409:
URL: https://github.com/apache/kafka/pull/10409


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10532: KAFKA-8531: Change default replication factor config

2021-04-13 Thread GitBox


ableegoldman commented on pull request #10532:
URL: https://github.com/apache/kafka/pull/10532#issuecomment-818967367


   > Was also wondering about a potential error message -- not sure atm what 
error message a user would get if they run against 2.3 brokers and if the error 
message would be clear. Should we do anything about it?
   
   Not sure either -- maybe you can use the soak test to spin up brokers on 2.2 
against this PR and check out the error message + stack trace? I definitely 
think we should try to catch the error and log a more helpful error message (eg 
`in 3.0 we changed the default replication factor to -1 which requires brokers 
to be on 2.3+, please update your brokers or manually set the replication 
factor config`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-13 Thread GitBox


mjsax commented on pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#issuecomment-818965852


   Updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10466: MINOR: Gradle upgrade: 6.8.3 -->> 7.0-rc-2 [work in progress]

2021-04-13 Thread GitBox


dejan2609 commented on pull request #10466:
URL: https://github.com/apache/kafka/pull/10466#issuecomment-818929043


   @ijuma I narrowed the gap and will post my findings here in a few days 
some more digging will be required in order to solve this (I will try to 
squeeze it asap).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-13 Thread Edward Vaisman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Vaisman updated KAFKA-12664:
---
Description: 
Hi Folks, I came across this issue when trying to aggregate data from two 
separate data centres into one data centre.

In the configuration below, you can see I am trying to replicate a topic from 
dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
(test_topic_dc2) to dc3.

However, when I try to replicate both topics from those datacenters at the same 
time I notice that connect gets stuck in a rebalance loop (see attachment for 
logs)
 [^connect.log.tar.gz]

excerpt of connect.log
{code:java}
2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
Successfully synced group in generation Generation{generationId=347, 
memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
protocol='sessioned'} 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 
17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 
17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
at generation 347 with protocol version 2 and got assignment: 
Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance 
delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
{code}
To replicate the issue here is what I used:

[^mm2.properties]
{code:java}
clusters = dc1, dc2, dc3
dc1.bootstrap.servers = kafka-dc1:19092
dc2.bootstrap.servers = kafka-dc2:19093
dc3.bootstrap.servers = kafka-dc3:19094
dc1.group.id=mm2-dc1
dc2.group.id=mm2-dc2
dc3.group.id=mm2-dc3
replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
dc1->dc3.enabled = true
dc1->dc3.topics = test_topic_dc1
dc2->dc3.enabled = true
dc2->dc3.topics = test_topic_dc2
dc3->dc2 = falsedc3->dc1 = false
{code}
This [^docker-compose-multi.yml] file to create local kafka clusters 
(dc1,dc2,dc3)
 (I set docker to use 6 cpus, 8gb mem, swap 2gb)

I then ran an interactive shell to run mirror maker within the same 
docker-compose network (change network to match yours)
{code:java}
docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash

# Upload mm2 properties on server

/opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
Kafkacat commands to produce to dc1, dc2
{code:java}
kafkacat -b localhost:9092 -t test_topic_dc1 -P
Hello World from DC1!{code}
{code:java}
kafkacat -b localhost:9093 -t test_topic_dc2 -P
Hello World from DC2{code}
I then tried to remove one of the datacenters to confirm if it was a 
configuration problem, however mirror maker ran successfully with the below 
configuration

mm2.properties
{code:java}
clusters = dc2, dc3
dc2.bootstrap.servers = kafka-dc2:19093
dc3.bootstrap.servers = kafka-dc3:19094
dc2.group.id=mm2-dc2
dc3.group.id=mm2-dc3
replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
dc2->dc3.enabled = true
dc2->dc3.topics = test_topic_dc2
{code}
Any help would be appreciated!

  was:
Hi Folks, I came across this issue when trying to aggregate data from two 
separate data centres into one data centre.

In the configuration below, you can see I am trying to replicate a topic from 
dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
(test_topic_dc2) to dc3.

However, when I try to replicate both topics from those datacenters at the same 
time i notice that connect gets stuck in a rebalance loop (see attachment for 
logs)
[^connect.log.tar.gz]


exerpt of connect.log
{code:java}
2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
Successfully synced group in generation Generation{generationId=347, 
memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
protocol='sessioned'} 
(org.apache.kafka.clients.consumer.internals.A

[jira] [Created] (KAFKA-12664) Mirrormaker 2.0 infinate rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-13 Thread Edward Vaisman (Jira)
Edward Vaisman created KAFKA-12664:
--

 Summary: Mirrormaker 2.0 infinate rebalance loop when dealing with 
more than 2 clusters in standalone mode
 Key: KAFKA-12664
 URL: https://issues.apache.org/jira/browse/KAFKA-12664
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.7.0, 2.6.0, 2.4.1, 2.5.0
Reporter: Edward Vaisman
 Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
mm2.properties

Hi Folks, I came across this issue when trying to aggregate data from two 
separate data centres into one data centre.

In the configuration below, you can see I am trying to replicate a topic from 
dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
(test_topic_dc2) to dc3.

However, when I try to replicate both topics from those datacenters at the same 
time i notice that connect gets stuck in a rebalance loop (see attachment for 
logs)
[^connect.log.tar.gz]


exerpt of connect.log
{code:java}
2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
Successfully synced group in generation Generation{generationId=347, 
memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
protocol='sessioned'} 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 
17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 
17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
at generation 347 with protocol version 2 and got assignment: 
Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance 
delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
{code}


To replicate the issue here is what I used:

 

[^mm2.properties]

 
{code:java}
clusters = dc1, dc2, dc3
dc1.bootstrap.servers = kafka-dc1:19092
dc2.bootstrap.servers = kafka-dc2:19093
dc3.bootstrap.servers = kafka-dc3:19094
dc1.group.id=mm2-dc1
dc2.group.id=mm2-dc2
dc3.group.id=mm2-dc3
replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
dc1->dc3.enabled = true
dc1->dc3.topics = test_topic_dc1
dc2->dc3.enabled = true
dc2->dc3.topics = test_topic_dc2
dc3->dc2 = falsedc3->dc1 = false
{code}
This [^docker-compose-multi.yml] file to create local kafka clusters 
(dc1,dc2,dc3)
(I set docker to use 6 cpus, 8gb mem, swap 2gb)

I then ran an interactive shell to run mirror maker within the same 
docker-compose network (change network to match yours)
{code:java}
docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash

# Upload mm2 properties on server

/opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}

Kafkacat commands to produce to dc1, dc2
{code:java}
kafkacat -b localhost:9092 -t test_topic_dc1 -P
Hello World from DC1!{code}
{code:java}
kafkacat -b localhost:9093 -t test_topic_dc2 -P
Hello World from DC2{code}

I then tried to remove one of the datacenters to confirm if it was a 
configuration problem, however mirror maker ran successfully with the below 
configuration

mm2.properties

 
{code:java}
clusters = dc2, dc3
dc2.bootstrap.servers = kafka-dc2:19093
dc3.bootstrap.servers = kafka-dc3:19094
dc2.group.id=mm2-dc2
dc3.group.id=mm2-dc3
replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
dc2->dc3.enabled = true
dc2->dc3.topics = test_topic_dc2
{code}
 

Any help would be appreciated!

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-13 Thread Edward Vaisman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Vaisman updated KAFKA-12664:
---
Summary: Mirrormaker 2.0 infinite rebalance loop when dealing with more 
than 2 clusters in standalone mode  (was: Mirrormaker 2.0 infinate rebalance 
loop when dealing with more than 2 clusters in standalone mode)

> Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 
> clusters in standalone mode
> -
>
> Key: KAFKA-12664
> URL: https://issues.apache.org/jira/browse/KAFKA-12664
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0
>Reporter: Edward Vaisman
>Priority: Major
> Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
> mm2.properties
>
>
> Hi Folks, I came across this issue when trying to aggregate data from two 
> separate data centres into one data centre.
> In the configuration below, you can see I am trying to replicate a topic from 
> dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
> (test_topic_dc2) to dc3.
> However, when I try to replicate both topics from those datacenters at the 
> same time i notice that connect gets stuck in a rebalance loop (see 
> attachment for logs)
> [^connect.log.tar.gz]
> exerpt of connect.log
> {code:java}
> 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
> Successfully synced group in generation Generation{generationId=347, 
> memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> protocol='sessioned'} 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
>  17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
> at generation 347 with protocol version 2 and got assignment: 
> Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
> {code}
> To replicate the issue here is what I used:
>  
> [^mm2.properties]
>  
> {code:java}
> clusters = dc1, dc2, dc3
> dc1.bootstrap.servers = kafka-dc1:19092
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc1.group.id=mm2-dc1
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc1->dc3.enabled = true
> dc1->dc3.topics = test_topic_dc1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> dc3->dc2 = falsedc3->dc1 = false
> {code}
> This [^docker-compose-multi.yml] file to create local kafka clusters 
> (dc1,dc2,dc3)
> (I set docker to use 6 cpus, 8gb mem, swap 2gb)
> I then ran an interactive shell to run mirror maker within the same 
> docker-compose network (change network to match yours)
> {code:java}
> docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash
> # Upload mm2 properties on server
> /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
> Kafkacat commands to produce to dc1, dc2
> {code:java}
> kafkacat -b localhost:9092 -t test_topic_dc1 -P
> Hello World from DC1!{code}
> {code:java}
> kafkacat -b localhost:9093 -t test_topic_dc2 -P
> Hello World from DC2{code}
> I then tried to remove one of the datacenters to confirm if it was a 
> configuration problem, however mirror maker ran successfully with the below 
> configuration
> mm2.properties
>  
> {code:java}
> clusters = dc2, dc3
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.stora

[GitHub] [kafka] C0urante commented on pull request #10520: KAFKA-10816: Use root resource as readiness and health probe for Connect distributed mode

2021-04-13 Thread GitBox


C0urante commented on pull request #10520:
URL: https://github.com/apache/kafka/pull/10520#issuecomment-818931717


   @tombentley that's fair. I was thinking that it's not really a part of the 
contract that `/` be implemented under the hood in a certain way (i.e., as a 
static resource or as one that's routed through the herder's request queue), 
but in order for this change to be useful (and protected against being undone 
in the future), it's important that this change be codified via KIP so that the 
endpoint is both documented and supported as a viable health check for 
distributed workers (and specifically, their herder).
   
   @kpatelatwork I'll address the two scenarios you've outlined separately:
   1. If the probe is called every second, it's unlikely to have a significant 
impact on worker health. If nothing else is queued up (such as a rebalance, a 
different request, session key rotation, etc.), then most of the code path that 
gets traversed will consist of no-ops or trivial operations. I ran some local 
benchmarking and each iteration of the tick thread took less than two 
milliseconds; most of the time that was taken for each request to `GET /` was 
spent outside of the tick thread, but even then, I was able to issue about 325 
requests/sec on my local machine with some sloppy, unoptimized benchmarking*. 
So if someone's using this endpoint reasonably for healthcheck/monitoring 
purposes and issuing at most one request every second, the additional 
performance hit should be fairly negligible.
   2. I can't envision a realistic scenario where this degrades worker health 
by competing with other herder requests, even if hundreds of healthcheck 
requests are issued per second. Yes, they will be handled on the same thread 
that takes care of "real" herder requests, but the actual work done in the 
herder thread to respond to healthcheck requests is tiny--just a lookup of a 
static object. If so many requests come in that a prior tick iteration hasn't 
completed before currently-active ones are in progress, they'll get grouped 
together and handled en masse within a single tick iteration, at which point, 
performance will be extremely high (likely on the order of thousands if not 
millions of iterations per second). The only circumstance I can envision where 
this is remotely possible is if the worker is busy doing other work that takes 
much longer, such as participating in an expensive rebalance, or trying and 
failing to read to the end of the config topic. At that point, if it continu
 es to receive requests, they'll pile up and things might get out of hand. But 
in order for that to happen, each request will either have to originate from a 
different client, or there will have to be a client out there sending out 
simultaneous requests without waiting for still-active ones to complete. This 
only seems possible with some kind of malicious intent, but since it's already 
possible to lock down access to your worker using REST extensions, we probably 
don't have to worry about DDos protection for something like this. If we still 
want to do some kind of caching, I think one second should be more than enough; 
the longer it gets, the harder we make it harder to detect bad worker health 
when things are going south.
   
   
   **\*** - `time bash -c "for _ in {0..999}; do curl -sS -o /dev/null 
localhost:8083/ & done && wait"`, if anyone's interested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-13 Thread GitBox


mjsax commented on a change in pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#discussion_r61271



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -153,7 +156,8 @@ public void shouldNotCreateTopicsWithEmptyInput() throws 
Exception {
 
 @Test
 public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {
-final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final AdminClient admin = EasyMock.createStrictMock(AdminClient.class);
+config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10_000L);

Review comment:
   Cool. Works for me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode

2021-04-13 Thread GitBox


cmccabe merged pull request #10343:
URL: https://github.com/apache/kafka/pull/10343


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10520: KAFKA-10816: Use root resource as readiness and health probe for Connect distributed mode

2021-04-13 Thread GitBox


C0urante commented on pull request #10520:
URL: https://github.com/apache/kafka/pull/10520#issuecomment-818932302


   Converting this to DRAFT pending a KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode

2021-04-13 Thread GitBox


cmccabe commented on a change in pull request #10343:
URL: https://github.com/apache/kafka/pull/10343#discussion_r612656387



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean 
unclean) {
 return ControllerResult.of(records, null);
 }
 
+ControllerResult>
+createPartitions(List topics) {
+List records = new ArrayList<>();
+List results = new ArrayList<>();
+for (CreatePartitionsTopic topic : topics) {
+ApiError apiError = ApiError.NONE;
+try {
+createPartitions(topic, records);
+} catch (ApiException e) {
+apiError = ApiError.fromThrowable(e);
+} catch (Exception e) {
+log.error("Unexpected createPartitions error for {}", topic, 
e);
+apiError = ApiError.fromThrowable(e);
+}
+results.add(new CreatePartitionsTopicResult().
+setName(topic.name()).
+setErrorCode(apiError.error().code()).
+setErrorMessage(apiError.message()));
+}
+return new ControllerResult<>(records, results, true);
+}
+
+void createPartitions(CreatePartitionsTopic topic,
+  List records) {
+Uuid topicId = topicsByName.get(topic.name());
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException();
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException();
+}
+if (topic.count() == topicInfo.parts.size()) {

Review comment:
   Yeah, for now I'm just being consistent with the current implementation. 
 I think eventually we will want a retransmission cache to solve all these 
problems at once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12608) Simple identity pipeline sometimes loses data

2021-04-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320357#comment-17320357
 ] 

Matthias J. Sax commented on KAFKA-12608:
-

What was said above, and to be more concrete: you create data that is 3 month 
old: 
[https://github.com/jamii/streaming-consistency/blob/c43722cea1394b0812d9fdbb20f063b47c9bd645/transactions.py#L17]

Thus, with default retention time of 7 days, broker may delete the data before 
you count the result records. The issue is not on the input topic for which the 
record timestamp will be set to "now" – however, your TimestampExtractor give 
the "old" timestamp to the KafkaStreams runtime, and KafkaStreams will set this 
"old" timestamp as record timestamp when writing the result records into the 
output topic.

> Simple identity pipeline sometimes loses data
> -
>
> Key: KAFKA-12608
> URL: https://issues.apache.org/jira/browse/KAFKA-12608
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos 
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running a very simple streams program that reads records from one topic 
> into a table and then writes the stream back into another topic. In about 1 
> in 5 runs, some of the output records are missing. They tend to form a single 
> contiguous range, as if a single batch was dropped somewhere.
> https://github.com/jamii/streaming-consistency/blob/main/kafka-streams/src/main/java/Demo.java#L49-L52
> {code:bash}
> $ wc -l tmp/*transactions
>  999514 tmp/accepted_transactions
>  100 tmp/transactions
>  1999514 total
> $ cat tmp/transactions | cut -d',' -f 1 | cut -d' ' -f 2 > in
> $ cat tmp/accepted_transactions | cut -d',' -f 1 | cut -d':' -f 2 > out
> $ diff in out | wc -l
>  487
> $ diff in out | head
>  25313,25798d25312
>  < 25312
>  < 25313
>  < 25314
>  < 25315
>  < 25316
>  < 25317
>  < 25318
>  < 25319
>  < 25320
>  
> $ diff in out | tail
>  < 25788
>  < 25789
>  < 25790
>  < 25791
>  < 25792
>  < 25793
>  < 25794
>  < 25795
>  < 25796
>  < 25797
> {code}
> I've checked running the consumer multiple times to make sure that the 
> records are actually missing from the topic and it wasn't just a hiccup in 
> the consumer. 
> The repo linked above has instructions in the readme on how to reproduce the 
> exact versions used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12368) Inmemory implementation of RSM and RLMM.

2021-04-13 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12368.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

merged the PR to trunk

> Inmemory implementation of RSM and RLMM. 
> -
>
> Key: KAFKA-12368
> URL: https://issues.apache.org/jira/browse/KAFKA-12368
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-13 Thread GitBox


junrao merged pull request #10218:
URL: https://github.com/apache/kafka/pull/10218


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-13 Thread GitBox


C0urante commented on a change in pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#discussion_r612518884



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -369,9 +370,31 @@ else if (serverConnector != null && 
serverConnector.getHost() != null && serverC
 else if (serverConnector != null && serverConnector.getPort() > 0)
 builder.port(serverConnector.getPort());
 
-log.info("Advertised URI: {}", builder.build());
+URI uri = builder.build();
+validateUriHost(uri);
+log.info("Advertised URI: {}", uri);
 
-return builder.build();
+return uri;
+}
+
+/**
+ * Parses the uri and throws a more definitive error
+ * when the internal node to node communication can't happen due to an 
invalid host name.
+ */
+private void validateUriHost(URI uri) {
+if (uri.getHost() == null) {
+String host = Utils.getHost(uri.getAuthority());
+String errorMsg = "Invalid host=" + host + ", in url=" + 
uri.toString();

Review comment:
   If `host` is null (due to a parsing failure in `Utils::getHost`), won't 
this lead to a similarly-confusing error message for the user? I'm not sure 
`Invalid host=null, in url ...` would be easy to decipher, and although it's 
unclear if we could ever trigger that code path at the moment, it may be worth 
considering in case later refactoring of this and other classes makes it 
possible.
   
   Perhaps something like "Could not parse host from advertised URL " 
would help shed more light here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-13 Thread GitBox


vvcephei commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612591727



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +132,40 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
ValueOrOtherValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";
+final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
+
+outerJoinWindowStore = 
Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+}
+
+// Time shared between joins to keep track of the maximum stream time

Review comment:
   @mjsax is correct that there is a bug that the processor-local stream 
time gets reset on rebalance/restart. It would be good to fix it, but with the 
current architecture, the only correct solution is to persist the 
processor-local stream time. Another approach we've discussed is to remove the 
time-delay effect of the record cache.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-13 Thread GitBox


vvcephei commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612589665



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +132,40 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
ValueOrOtherValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";
+final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
+
+outerJoinWindowStore = 
Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+}
+
+// Time shared between joins to keep track of the maximum stream time

Review comment:
   It's extremely subtle, but we cannot use `context.streamTime()` because 
of the time-delay effects of upstream record caches. This was the cause of a 
severe bug in `suppress` that went undetected until after it was released.
   
   For example: if we have a record cache upstream of this join, it will delay 
the propogation of records (and their accompanying timestamps) by time amount 
`D`. Say we ingest some record with timestamp `T`. If we reference the 
context's stream time, our processor will think it is at time `T`, when it is 
really at time `T - D`, leading it to behave wrongly, such as enforcing the 
grace period prematurely, which will manifest to users as data loss.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10520: KAFKA-10816: Use root resource as readiness and health probe for Connect distributed mode

2021-04-13 Thread GitBox


kpatelatwork commented on pull request #10520:
URL: https://github.com/apache/kafka/pull/10520#issuecomment-818841491


   @C0urante the change looks good. 
   
   I am a newbie here but correct me If I am wrong on the below minor concerns:
   
   1. Agree with Tom that a small KIP is necessary as this is a change in 
behavior.
   2. If the probe is called every second via some monitoring request or 
someone calls it 100s of time in a second then won't it compete with other 
herder tasks in the queue, should we cache the results of 
herder.kafkaClusterId() in RootResource for 1 min. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >