[jira] [Commented] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984184#comment-16984184 ] ASF GitHub Bot commented on KAFKA-9244: --- mjsax commented on pull request #7758: KAFKA-9244: Update FK reference should unsubscribe old FK URL: https://github.com/apache/kafka/pull/7758 Call for review @vvcephei @bellemare This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Assignee: Matthias J. Sax >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())", after we > published data on RHS with previous associated foreign key. > > {code:java} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984174#comment-16984174 ] Matthias J. Sax commented on KAFKA-9244: Ah sorry. I mixed up something when reading your test case. I was able to reproduce the issue. Thank for reporting it. > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())", after we > published data on RHS with previous associated foreign key. > > {code:java} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9244: -- Assignee: Matthias J. Sax > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Assignee: Matthias J. Sax >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())", after we > published data on RHS with previous associated foreign key. > > {code:java} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kin Siu updated KAFKA-9244: --- Description: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed at the point of last "is(emptyMap())", after we published data on RHS with previous associated foreign key. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} was: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed at the point of last "is(emptyMap())". {code:java|linenumbers=true} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") );
[jira] [Commented] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984139#comment-16984139 ] Matthias J. Sax commented on KAFKA-9244: {quote}mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") <-- this is a line to check the State store of join result, {quote} As I said in my comment above, `outputTopic.readKeyValuesToMap()` does *not* return the _content_ of the state store. Each _update_ to the state store, will write one output record into the output topic, and `outputTopic.readKeyValuesToMap()` will give you those _update_ records. Because the last input record does not trigger an update to the store, the won't be an output record. If you want to check the _content_ of the state store, you would need to access the state store via {quote}{{ToplogyTestDriver#getTimestampedKeyValueStore()}} {{// or if you don't care about the timestamp}} {{ToplogyTestDriver#getKeyValueStore()}}{{}}{quote} > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())". > > {code:java|linenumbers=true} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9244: --- Comment: was deleted (was: {quote}mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") <-- this is a line to check the State store of join result, {quote} As I said in my comment above, `outputTopic.readKeyValuesToMap()` does *not* return the _content_ of the state store. Each _update_ to the state store, will write one output record into the output topic, and `outputTopic.readKeyValuesToMap()` will give you those _update_ records. Because the last input record does not trigger an update to the store, the won't be an output record. If you want to check the _content_ of the state store, you would need to access the state store via {quote}{{ToplogyTestDriver#getTimestampedKeyValueStore()}} {{// or if you don't care about the timestamp}} {{ToplogyTestDriver#getKeyValueStore()}}{{}}{quote}) > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())". > > {code:java|linenumbers=true} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984045#comment-16984045 ] Matthias J. Sax edited comment on KAFKA-9244 at 11/28/19 4:19 AM: -- In you last test case, {quote}{{right.pipeInput("rhs1", "rhsValue1Delta");}} {quote} Why do you expect: {quote}{{mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")}} {quote} As you say not update should be produced, and thus the result should be empty. Note that each time you call {quote}{{outputTopic.readKeyValuesToMap()}} {quote} output records are received and than "purged", hence, you won't receive them twice. `readKeyValuesToMap()` does _not_ access the KTable state store, but it only consumer the table output/changelog topic and does not re-read the whole topic each time, but reads it incrementally. was (Author: mjsax): In you last test case, right.pipeInput("rhs1", "rhsValue1Delta"); Why do you expect: mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") As you say not update should be produced, and thus the result should be empty. Note that each time you call outputTopic.readKeyValuesToMap() output records are received and than "purged", hence, you won't receive them twice. `readKeyValuesToMap()` does _not_ access the KTable state store, but it only consumer the table output/changelog topic and does not re-read the whole topic each time, but reads it incrementally. > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())". > > {code:java|linenumbers=true} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValu
[jira] [Comment Edited] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984048#comment-16984048 ] Kin Siu edited comment on KAFKA-9244 at 11/28/19 2:37 AM: -- mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") <-- this is a line to check the State store of join result, since my change of "rhs1" shouldn't generate value, we should have same state store value as before. And above test case failed on the point of last "is(emptyMap())" check, failure message of JUnit test as below : {code:java} java.lang.AssertionError: Expected: is <{}> but: was <{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> Expected :is <{}> Actual :<{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doJoinFromLeftThenChangeForeignKeyMappingOfLeft(KTableKTableForeignKeyJoinIntegrationTest.java:142) {code} was (Author: nikuis): mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") <-- this is a line to check the State store of join result, since my change of "rhs1" shouldn't generate value, we should have same state store value as before. And above test case failed on the point of last "is(emptyMap())" check, below the failure message of JUnit test {code:java} java.lang.AssertionError: Expected: is <{}> but: was <{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> Expected :is <{}> Actual :<{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doJoinFromLeftThenChangeForeignKeyMappingOfLeft(KTableKTableForeignKeyJoinIntegrationTest.java:142) {code} > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())". > > {code:java|linenumbers=true} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), >
[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984073#comment-16984073 ] Boyang Chen commented on KAFKA-8803: To be more specific, quoted from [~hachikuji]: ``` we could look at request metrics to see if the delay is present for example, if there is a big request queue delay, then that probably means the broker is overloaded ``` > Stream will not start due to TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > > > Key: KAFKA-8803 > URL: https://issues.apache.org/jira/browse/KAFKA-8803 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Raman Gupta >Assignee: Boyang Chen >Priority: Major > Attachments: logs.txt.gz, screenshot-1.png > > > One streams app is consistently failing at startup with the following > exception: > {code} > 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] > org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout > exception caught when initializing transactions for task 0_36. This might > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout. > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > {code} > These same brokers are used by many other streams without any issue, > including some in the very same processes for the stream which consistently > throws this exception. > *UPDATE 08/16:* > The very first instance of this error is August 13th 2019, 17:03:36.754 and > it happened for 4 different streams. For 3 of these streams, the error only > happened once, and then the stream recovered. For the 4th stream, the error > has continued to happen, and continues to happen now. > I looked up the broker logs for this time, and see that at August 13th 2019, > 16:47:43, two of four brokers started reporting messages like this, for > multiple partitions: > [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader > reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread) > The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, > here is a view of the count of these messages over time: > !screenshot-1.png! > However, as noted, the stream task timeout error continues to happen. > I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 > broker. The broker has a patch for KAFKA-8773. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984069#comment-16984069 ] Boyang Chen commented on KAFKA-8803: [~rocketraman] Thanks, so the issue happens spontaneously without any server/client side change? Do you see any workload change on broker during that time, or leadership change? > Stream will not start due to TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > > > Key: KAFKA-8803 > URL: https://issues.apache.org/jira/browse/KAFKA-8803 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Raman Gupta >Assignee: Boyang Chen >Priority: Major > Attachments: logs.txt.gz, screenshot-1.png > > > One streams app is consistently failing at startup with the following > exception: > {code} > 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] > org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout > exception caught when initializing transactions for task 0_36. This might > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout. > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > {code} > These same brokers are used by many other streams without any issue, > including some in the very same processes for the stream which consistently > throws this exception. > *UPDATE 08/16:* > The very first instance of this error is August 13th 2019, 17:03:36.754 and > it happened for 4 different streams. For 3 of these streams, the error only > happened once, and then the stream recovered. For the 4th stream, the error > has continued to happen, and continues to happen now. > I looked up the broker logs for this time, and see that at August 13th 2019, > 16:47:43, two of four brokers started reporting messages like this, for > multiple partitions: > [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader > reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread) > The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, > here is a view of the count of these messages over time: > !screenshot-1.png! > However, as noted, the stream task timeout error continues to happen. > I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 > broker. The broker has a patch for KAFKA-8773. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984048#comment-16984048 ] Kin Siu edited comment on KAFKA-9244 at 11/28/19 12:49 AM: --- mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") <-- this is a line to check the State store of join result, since my change of "rhs1" shouldn't generate value, we should have same state store value as before. And above test case failed on the point of last "is(emptyMap())" check, below the failure message of JUnit test {code:java} java.lang.AssertionError: Expected: is <{}> but: was <{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> Expected :is <{}> Actual :<{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doJoinFromLeftThenChangeForeignKeyMappingOfLeft(KTableKTableForeignKeyJoinIntegrationTest.java:142) {code} was (Author: nikuis): mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") <-- this is a line to check the State store of join result, since my change of "rhs1" shouldn't generate value, we should have some state store value as before. And above test case failed on the point of "is(emptyMap())" check, below the failure message of JUnit test {code:java} java.lang.AssertionError: Expected: is <{}> but: was <{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> Expected :is <{}> Actual :<{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doJoinFromLeftThenChangeForeignKeyMappingOfLeft(KTableKTableForeignKeyJoinIntegrationTest.java:142) {code} > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed at the point of last "is(emptyMap())". > > {code:java|linenumbers=true} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), >
[jira] [Updated] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kin Siu updated KAFKA-9244: --- Description: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed at the point of last "is(emptyMap())". {code:java|linenumbers=true} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} was: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed at the point of last "is(emptyMap())". {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValue
[jira] [Updated] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kin Siu updated KAFKA-9244: --- Description: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed at the point of last "is(emptyMap())". {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} was: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed though. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected)
[jira] [Commented] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984048#comment-16984048 ] Kin Siu commented on KAFKA-9244: mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") <-- this is a line to check the State store of join result, since my change of "rhs1" shouldn't generate value, we should have some state store value as before. And above test case failed on the point of "is(emptyMap())" check, below the failure message of JUnit test {code:java} java.lang.AssertionError: Expected: is <{}> but: was <{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> Expected :is <{}> Actual :<{lhs1=(lhsValue1|rhs2,rhsValue1Delta)}> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doJoinFromLeftThenChangeForeignKeyMappingOfLeft(KTableKTableForeignKeyJoinIntegrationTest.java:142) {code} > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed though. > > {code:java} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984045#comment-16984045 ] Matthias J. Sax commented on KAFKA-9244: In you last test case, right.pipeInput("rhs1", "rhsValue1Delta"); Why do you expect: mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") As you say not update should be produced, and thus the result should be empty. Note that each time you call outputTopic.readKeyValuesToMap() output records are received and than "purged", hence, you won't receive them twice. `readKeyValuesToMap()` does _not_ access the KTable state store, but it only consumer the table output/changelog topic and does not re-read the whole topic each time, but reads it incrementally. > Update of old FK reference on RHS should not trigger join result > > > Key: KAFKA-9244 > URL: https://issues.apache.org/jira/browse/KAFKA-9244 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Kin Siu >Priority: Major > > Perform a KTable-KTable foreign key join, after changing LHS FK reference > from FK1 -> FK2, populating update on RHS with FK1 should not produce join > result. > Below test case failed though. > > {code:java} > @Test > public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { > final Topology topology = getTopology(streamsConfig, "store", > leftJoin); > try (final TopologyTestDriver driver = new > TopologyTestDriver(topology, streamsConfig)) { > final TestInputTopic right = > driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestInputTopic left = > driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new > StringSerializer()); > final TestOutputTopic outputTopic = > driver.createOutputTopic(OUTPUT, new StringDeserializer(), new > StringDeserializer()); > final KeyValueStore store = > driver.getKeyValueStore("store"); > // Pre-populate the RHS records. This test is all about what > happens when we change LHS records foreign key reference > // then populate update on RHS > right.pipeInput("rhs1", "rhsValue1"); > right.pipeInput("rhs2", "rhsValue2"); > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(emptyMap()) > ); > left.pipeInput("lhs1", "lhsValue1|rhs1"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Change LHS foreign key reference > left.pipeInput("lhs1", "lhsValue1|rhs2"); > { > final Map expected = mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > ); > assertThat( > outputTopic.readKeyValuesToMap(), > is(expected) > ); > assertThat( > asMap(store), > is(expected) > ); > } > // Populate RHS update on old LHS foreign key ref > right.pipeInput("rhs1", "rhsValue1Delta"); > { > assertThat( > outputTopic.readKeyValuesToMap(), > is(emptyMap()) > ); > assertThat( > asMap(store), > is(mkMap( > mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") > )) > ); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore
[ https://issues.apache.org/jira/browse/KAFKA-9243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9243: --- Affects Version/s: 2.3.0 > Update the javadocs from KeyValueStore to TimestampKeyValueStore > > > Key: KAFKA-9243 > URL: https://issues.apache.org/jira/browse/KAFKA-9243 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Walker Carlson >Priority: Minor > Labels: beginner, newbie > > As of version 2.3, the DSL uses `TimestampedStores` to represent KTables. > However, the JavaDocs of all table-related operators still refer to plain > `KeyValueStores` etc instead of `TimestampedKeyValueStore` etc. Hence, all > those JavaDocs should be updated (the JavaDocs are technically not incorrect, > because one can access a TimestampedKeyValueStore as a KeyValueStore, too – > hence this ticket is not a "bug" but an improvement. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore
[ https://issues.apache.org/jira/browse/KAFKA-9243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9243: --- Description: As of version 2.3, the DSL uses `TimestampedStores` to represent KTables. However, the JavaDocs of all table-related operators still refer to plain `KeyValueStores` etc instead of `TimestampedKeyValueStore` etc. Hence, all those JavaDocs should be updated (the JavaDocs are technically not incorrect, because one can access a TimestampedKeyValueStore as a KeyValueStore, too – hence this ticket is not a "bug" but an improvement. (was: Materialized objects use KayValueStore but the docs should be for TimestampedKeyValueStore because of changes to Materialized. This tickets should be broken down in a series of smaller PRs to keep the scope of each PR contained, allowing for more effective reviews.) > Update the javadocs from KeyValueStore to TimestampKeyValueStore > > > Key: KAFKA-9243 > URL: https://issues.apache.org/jira/browse/KAFKA-9243 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Minor > Labels: beginner, newbie > > As of version 2.3, the DSL uses `TimestampedStores` to represent KTables. > However, the JavaDocs of all table-related operators still refer to plain > `KeyValueStores` etc instead of `TimestampedKeyValueStore` etc. Hence, all > those JavaDocs should be updated (the JavaDocs are technically not incorrect, > because one can access a TimestampedKeyValueStore as a KeyValueStore, too – > hence this ticket is not a "bug" but an improvement. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore
[ https://issues.apache.org/jira/browse/KAFKA-9243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9243: --- Component/s: (was: streams-test-utils) (was: producer ) (was: KafkaConnect) (was: consumer) (was: clients) (was: admin) > Update the javadocs from KeyValueStore to TimestampKeyValueStore > > > Key: KAFKA-9243 > URL: https://issues.apache.org/jira/browse/KAFKA-9243 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Minor > Labels: beginner, newbie > > Materialized objects use KayValueStore but the docs should be for > TimestampedKeyValueStore because of changes to Materialized. > This tickets should be broken down in a series of smaller PRs to keep the > scope of each PR contained, allowing for more effective reviews. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
[ https://issues.apache.org/jira/browse/KAFKA-9244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kin Siu updated KAFKA-9244: --- Description: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. Below test case failed though. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} was: Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat(
[jira] [Created] (KAFKA-9244) Update of old FK reference on RHS should not trigger join result
Kin Siu created KAFKA-9244: -- Summary: Update of old FK reference on RHS should not trigger join result Key: KAFKA-9244 URL: https://issues.apache.org/jira/browse/KAFKA-9244 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.0 Reporter: Kin Siu Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result. {code:java} @Test public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() { final Topology topology = getTopology(streamsConfig, "store", leftJoin); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs2", "rhsValue2"); assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(emptyMap()) ); left.pipeInput("lhs1", "lhsValue1|rhs1"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Change LHS foreign key reference left.pipeInput("lhs1", "lhsValue1|rhs2"); { final Map expected = mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") ); assertThat( outputTopic.readKeyValuesToMap(), is(expected) ); assertThat( asMap(store), is(expected) ); } // Populate RHS update on old LHS foreign key ref right.pipeInput("rhs1", "rhsValue1Delta"); { assertThat( outputTopic.readKeyValuesToMap(), is(emptyMap()) ); assertThat( asMap(store), is(mkMap( mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") )) ); } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7016) Reconsider the "avoid the expensive and useless stack trace for api exceptions" practice
[ https://issues.apache.org/jira/browse/KAFKA-7016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983916#comment-16983916 ] ASF GitHub Bot commented on KAFKA-7016: --- ableegoldman commented on pull request #7756: KAFKA-7016: Don't fill in stack traces URL: https://github.com/apache/kafka/pull/7756 Stack traces are useful This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reconsider the "avoid the expensive and useless stack trace for api > exceptions" practice > > > Key: KAFKA-7016 > URL: https://issues.apache.org/jira/browse/KAFKA-7016 > Project: Kafka > Issue Type: Bug >Reporter: Martin Vysny >Priority: Major > > I am trying to write a Kafka Consumer; upon running it only prints out: > {\{ org.apache.kafka.common.errors.InvalidGroupIdException: The configured > groupId is invalid}} > Note that the stack trace is missing, so that I have no information which > part of my code is bad and need fixing; I also have no information which > Kafka Client method has been called. Upon closer examination I found this in > ApiException: > > {{/* avoid the expensive and useless stack trace for api exceptions */}} > {{@Override}} > {{public Throwable fillInStackTrace() {}} > \{{ return this;}} > {{}}} > > I think it is a bad practice to hide all useful debugging info and trade it > for dubious performance gains. Exceptions are for exceptional code flow which > are allowed to be slow. > > This applies to kafka-clients 1.1.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9213) BufferOverflowException on rolling new segment after upgrading Kafka from 1.1.0 to 2.3.1
[ https://issues.apache.org/jira/browse/KAFKA-9213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983909#comment-16983909 ] Ismael Juma commented on KAFKA-9213: Thanks for the thorough investigation! Any thoughts [~junrao]? > BufferOverflowException on rolling new segment after upgrading Kafka from > 1.1.0 to 2.3.1 > > > Key: KAFKA-9213 > URL: https://issues.apache.org/jira/browse/KAFKA-9213 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.3.1 > Environment: Ubuntu 16.04, AWS instance d2.8xlarge. > JAVA Options: > -Xms16G > -Xmx16G > -XX:G1HeapRegionSize=16M > -XX:MetaspaceSize=96m > -XX:MinMetaspaceFreeRatio=50 >Reporter: Daniyar >Priority: Blocker > > We updated our Kafka cluster from 1.1.0 version to 2.3.1. We followed up to > step 2 of the [update > instruction|[https://kafka.apache.org/documentation/#upgrade]]. > Message format and inter-broker protocol versions were left the same: > inter.broker.protocol.version=1.1 > log.message.format.version=1.1 > > After upgrading, we started to get some occasional exceptions: > {code:java} > 2019/11/19 05:30:53 INFO [ProducerStateManager > partition=matchmaker_retry_clicks_15m-2] Writing producer snapshot at > offset 788532 (kafka.log.ProducerStateManager) > 2019/11/19 05:30:53 INFO [Log partition=matchmaker_retry_clicks_15m-2, > dir=/mnt/kafka] Rolled new log segment at offset 788532 in 1 ms. > (kafka.log.Log) > 2019/11/19 05:31:01 ERROR [ReplicaManager broker=0] Error processing append > operation on partition matchmaker_retry_clicks_15m-2 > (kafka.server.ReplicaManager) > 2019/11/19 05:31:01 java.nio.BufferOverflowException > 2019/11/19 05:31:01 at java.nio.Buffer.nextPutIndex(Buffer.java:527) > 2019/11/19 05:31:01 at > java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797) > 2019/11/19 05:31:01 at > kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) > 2019/11/19 05:31:01 at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > 2019/11/19 05:31:01 at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > 2019/11/19 05:31:01 at > kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) > 2019/11/19 05:31:01 at > kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520) > 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$roll$8(Log.scala:1690) > 2019/11/19 05:31:01 at > kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690) > 2019/11/19 05:31:01 at scala.Option.foreach(Option.scala:407) > 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$roll$2(Log.scala:1690) > 2019/11/19 05:31:01 at > kafka.log.Log.maybeHandleIOException(Log.scala:2085) > 2019/11/19 05:31:01 at kafka.log.Log.roll(Log.scala:1654) > 2019/11/19 05:31:01 at kafka.log.Log.maybeRoll(Log.scala:1639) > 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$append$2(Log.scala:966) > 2019/11/19 05:31:01 at > kafka.log.Log.maybeHandleIOException(Log.scala:2085) > 2019/11/19 05:31:01 at kafka.log.Log.append(Log.scala:850) > 2019/11/19 05:31:01 at kafka.log.Log.appendAsLeader(Log.scala:819) > 2019/11/19 05:31:01 at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772) > 2019/11/19 05:31:01 at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > 2019/11/19 05:31:01 at > kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) > 2019/11/19 05:31:01 at > kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759) > 2019/11/19 05:31:01 at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763) > 2019/11/19 05:31:01 at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > 2019/11/19 05:31:01 at > scala.collection.TraversableLike.map(TraversableLike.scala:238) > 2019/11/19 05:31:01 at > scala.collection.TraversableLike.map$(TraversableLike.scala:231) > 2019/11/19 05:31:01 at > scala.collection.AbstractTraversable.map(Traversable.scala:108) > 2019/11/19 05:31:01 at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751) > 2019/11/19 05:31:01 at > kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492) > 2019/11/19 05:31:01 at > kafka.server.KafkaApis.handleProduceReque
[jira] [Commented] (KAFKA-9213) BufferOverflowException on rolling new segment after upgrading Kafka from 1.1.0 to 2.3.1
[ https://issues.apache.org/jira/browse/KAFKA-9213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983866#comment-16983866 ] Daniyar commented on KAFKA-9213: UPDATE: I've investigated another occurrence of this exception. For analyzes, I used: 1) a memory dump that was taken from the broker around 6.37 pm 2) kafka log file 3) kafka state-change log 4) log, index and time-index files of a failed segment 5) Kafka source code, version 2.3.1 and 1.1.0 Here's how the exception looks like in the kafka log: ``` 2019/11/19 16:03:00 INFO [ProducerStateManager partition=ad_group_metrics-62] Writing producer snapshot at offset 13886052 (kafka.log.ProducerStateManager) 2019/11/19 16:03:00 INFO [Log partition=ad_group_metrics-62, dir=/mnt/kafka] Rolled new log segment at offset 13886052 in 1 ms. (kafka.log.Log) 2019/11/19 16:03:00 ERROR [ReplicaManager broker=17] Error processing append operation on partition ad_group_metrics-62 (kafka.server.ReplicaManager) 2019/11/19 16:03:00 java.nio.BufferOverflowException 2019/11/19 16:03:00 at java.nio.Buffer.nextPutIndex(Buffer.java:527) 2019/11/19 16:03:00 at java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797) 2019/11/19 16:03:00 at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) 2019/11/19 16:03:00 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 2019/11/19 16:03:00 at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) 2019/11/19 16:03:00 at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$roll$8(Log.scala:1690) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690) 2019/11/19 16:03:00 at scala.Option.foreach(Option.scala:407) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$roll$2(Log.scala:1690) 2019/11/19 16:03:00 at kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at kafka.log.Log.roll(Log.scala:1654) 2019/11/19 16:03:00 at kafka.log.Log.maybeRoll(Log.scala:1639) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$append$2(Log.scala:966) 2019/11/19 16:03:00 at kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at kafka.log.Log.append(Log.scala:850) 2019/11/19 16:03:00 at kafka.log.Log.appendAsLeader(Log.scala:819) 2019/11/19 16:03:00 at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772) 2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) 2019/11/19 16:03:00 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759) 2019/11/19 16:03:00 at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763) 2019/11/19 16:03:00 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 2019/11/19 16:03:00 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) 2019/11/19 16:03:00 at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) 2019/11/19 16:03:00 at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) 2019/11/19 16:03:00 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) 2019/11/19 16:03:00 at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) 2019/11/19 16:03:00 at scala.collection.TraversableLike.map(TraversableLike.scala:238) 2019/11/19 16:03:00 at scala.collection.TraversableLike.map$(TraversableLike.scala:231) 2019/11/19 16:03:00 at scala.collection.AbstractTraversable.map(Traversable.scala:108) 2019/11/19 16:03:00 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751) 2019/11/19 16:03:00 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492) 2019/11/19 16:03:00 at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544) 2019/11/19 16:03:00 at kafka.server.KafkaApis.handle(KafkaApis.scala:113) 2019/11/19 16:03:00 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) 2019/11/19 16:03:00 at java.lang.Thread.run(Thread.java:748) ... ``` What we see here, is that a new segment was rolled out at the offset 13886052 and then an exception happened while trying to make _some_ segment as inactive ({{`onBecomeInactiveSegment`}}) on appending new messages to the Log. The timing of the rolling out of a new segment and appending new messages doesn't play a role. There are many other similar exceptions where this occurs a few seconds after rolling out of a new segment. I managed to find the {{`LogSegment`}} object for the offset 13886052 in the memory dump. I followed the source code logic, checking the LogSegment state and Kafka logs, and found that the `TimeIndex` object somehow went into the state with 0 entries and 0 max possible entries (and an empty memory map). Having
[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983858#comment-16983858 ] ASF GitHub Bot commented on KAFKA-9233: --- noslowerdna commented on pull request #7755: KAFKA-9233: Fix IllegalStateException in Fetcher retrieval of beginni… URL: https://github.com/apache/kafka/pull/7755 …ng or end offsets for duplicate TopicPartition values Minor bug fix. The issue was introduced in Kafka 2.3.0, likely by [KAFKA-7831](https://issues.apache.org/jira/browse/KAFKA-7831). Tested by, `./gradlew clients:test --tests FetcherTest` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka consumer throws undocumented IllegalStateException > > > Key: KAFKA-9233 > URL: https://issues.apache.org/jira/browse/KAFKA-9233 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0 >Reporter: Andrew Olson >Priority: Minor > > If the provided collection of TopicPartition instances contains any > duplicates, an IllegalStateException not documented in the javadoc is thrown > by internal Java stream code when calling KafkaConsumer#beginningOffsets or > KafkaConsumer#endOffsets. > The stack trace looks like this, > {noformat} > java.lang.IllegalStateException: Duplicate key -2 > at > java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) > at java.util.HashMap.merge(HashMap.java:1254) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555) > at > org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542) > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054) > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031) > {noformat} > {noformat} > java.lang.IllegalStateException: Duplicate key -1 > at > java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) > at java.util.HashMap.merge(HashMap.java:1254) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559) > at > org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081) > {noformat} > Looking at the code, it appears this may likely have been introduced by > KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated > TopicPartition values silently ignored. Either we should document this > exception possibility (probably wrapping it with a Kafka exception class) > indicating invalid client API usage, or restore the previous behavior where > the duplicates were harmless. -- This message was sent by Atlassian Jira (v8.3.4#80
[jira] [Assigned] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Olson reassigned KAFKA-9233: --- Assignee: Andrew Olson > Kafka consumer throws undocumented IllegalStateException > > > Key: KAFKA-9233 > URL: https://issues.apache.org/jira/browse/KAFKA-9233 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0 >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Minor > > If the provided collection of TopicPartition instances contains any > duplicates, an IllegalStateException not documented in the javadoc is thrown > by internal Java stream code when calling KafkaConsumer#beginningOffsets or > KafkaConsumer#endOffsets. > The stack trace looks like this, > {noformat} > java.lang.IllegalStateException: Duplicate key -2 > at > java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) > at java.util.HashMap.merge(HashMap.java:1254) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555) > at > org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542) > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054) > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031) > {noformat} > {noformat} > java.lang.IllegalStateException: Duplicate key -1 > at > java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) > at java.util.HashMap.merge(HashMap.java:1254) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559) > at > org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081) > {noformat} > Looking at the code, it appears this may likely have been introduced by > KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated > TopicPartition values silently ignored. Either we should document this > exception possibility (probably wrapping it with a Kafka exception class) > indicating invalid client API usage, or restore the previous behavior where > the duplicates were harmless. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic
[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983835#comment-16983835 ] Sophie Blee-Goldman commented on KAFKA-7663: [~twbecker] Those javadocs were added _because_ this bug was opened :) That said, I agree that this seems to be a valuable feature for a number of people (and currently a large source of confusion) since the only workaround is either unnecessarily costly for what *should* be a relatively simple operation. I also agree that the changelog seems like overkill, and kind of a regression. It also seems conceptually dissonant – for a global store, restoration and normal processing are essentially the same thing. That's currently broken since we pass through the processor during processing only, raising the question of why we even need (or are allowed to supply) a custom processor at all. I think it makes sense to just wrap the global store in a "processor layer" which can apply transformations before inserting and is passed through similarly for restoration as for normal processing. > Custom Processor supplied on addGlobalStore is not used when restoring state > from topic > --- > > Key: KAFKA-7663 > URL: https://issues.apache.org/jira/browse/KAFKA-7663 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Frederic Tardif >Priority: Major > Attachments: image-2018-11-20-11-42-14-697.png > > > I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom > processor responsible to transform a K,V record from the input stream into a > V,K records. It works fine and my {{store.all()}} does print the correct > persisted V,K records. However, if I clean the local store and restart the > stream app, the global table is reloaded but without going through the > processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} > which simply stores the input topic K,V records into rocksDB (hence bypassing > the mapping function of my custom processor). I believe this must not be the > expected result? > This is a follow up on stackoverflow discussion around storing a K,V topic > as a global table with some stateless transformations based on a "custom" > processor added on the global store: > [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729] > If we address this issue, we should also apply > `default.deserialization.exception.handler` during restore (cf. KAFKA-8037) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7016) Reconsider the "avoid the expensive and useless stack trace for api exceptions" practice
[ https://issues.apache.org/jira/browse/KAFKA-7016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983829#comment-16983829 ] Gary Russell commented on KAFKA-7016: - +1 - it's a total pain to have to do a text search to figure out where an exception occurred. Why is it taking so long to get this fixed? > Reconsider the "avoid the expensive and useless stack trace for api > exceptions" practice > > > Key: KAFKA-7016 > URL: https://issues.apache.org/jira/browse/KAFKA-7016 > Project: Kafka > Issue Type: Bug >Reporter: Martin Vysny >Priority: Major > > I am trying to write a Kafka Consumer; upon running it only prints out: > {\{ org.apache.kafka.common.errors.InvalidGroupIdException: The configured > groupId is invalid}} > Note that the stack trace is missing, so that I have no information which > part of my code is bad and need fixing; I also have no information which > Kafka Client method has been called. Upon closer examination I found this in > ApiException: > > {{/* avoid the expensive and useless stack trace for api exceptions */}} > {{@Override}} > {{public Throwable fillInStackTrace() {}} > \{{ return this;}} > {{}}} > > I think it is a bad practice to hide all useful debugging info and trade it > for dubious performance gains. Exceptions are for exceptional code flow which > are allowed to be slow. > > This applies to kafka-clients 1.1.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore
Walker Carlson created KAFKA-9243: - Summary: Update the javadocs from KeyValueStore to TimestampKeyValueStore Key: KAFKA-9243 URL: https://issues.apache.org/jira/browse/KAFKA-9243 Project: Kafka Issue Type: Improvement Components: admin, clients, consumer, KafkaConnect, producer , streams, streams-test-utils Reporter: Walker Carlson Materialized objects use KayValueStore but the docs should be for TimestampedKeyValueStore because of changes to Materialized. This tickets should be broken down in a series of smaller PRs to keep the scope of each PR contained, allowing for more effective reviews. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic
[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983797#comment-16983797 ] Tommy Becker edited comment on KAFKA-7663 at 11/27/19 6:25 PM: --- Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure that javadoc was added after this bug was opened ;) This is a pretty important use-case for us. The topic we build the global store from is heterogeneous; we only need some types of records and need to rekey others prior to joining to our event stream. Personally, I feel like your second option (i.e. maintaining a real changelog topic) is probably overkill and simply running the records through the processors during restoration is adequate. Seems like we could even keep the current "fast path" restoration that bypasses serde when there is no custom processor configured. With respect to your recommended work-around, the problem with that is that if I'm not mistaken you lose the critical property of the global store being fully populated when the rest of the topology starts up. Because although the store will wait for "my-global-changelog" topic to be caught up, that doesn't mean much since that topic is itself fed from the main topology. was (Author: twbecker): Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure that javadoc was added before this bug was opened ;) This is a pretty important use-case for us. The topic we build the global store from is heterogeneous; we only need some types of records and need to rekey others prior to joining to our event stream. Personally, I feel like your second option (i.e. maintaining a real changelog topic) is probably overkill and simply running the records through the processors during restoration is adequate. Seems like we could even keep the current "fast path" restoration that bypasses serde when there is no custom processor configured. With respect to your recommended work-around, the problem with that is that if I'm not mistaken you lose the critical property of the global store being fully populated when the rest of the topology starts up. Because although the store will wait for "my-global-changelog" topic to be caught up, that doesn't mean much since that topic is itself fed from the main topology. > Custom Processor supplied on addGlobalStore is not used when restoring state > from topic > --- > > Key: KAFKA-7663 > URL: https://issues.apache.org/jira/browse/KAFKA-7663 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Frederic Tardif >Priority: Major > Attachments: image-2018-11-20-11-42-14-697.png > > > I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom > processor responsible to transform a K,V record from the input stream into a > V,K records. It works fine and my {{store.all()}} does print the correct > persisted V,K records. However, if I clean the local store and restart the > stream app, the global table is reloaded but without going through the > processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} > which simply stores the input topic K,V records into rocksDB (hence bypassing > the mapping function of my custom processor). I believe this must not be the > expected result? > This is a follow up on stackoverflow discussion around storing a K,V topic > as a global table with some stateless transformations based on a "custom" > processor added on the global store: > [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729] > If we address this issue, we should also apply > `default.deserialization.exception.handler` during restore (cf. KAFKA-8037) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic
[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983797#comment-16983797 ] Tommy Becker commented on KAFKA-7663: - Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure that javadoc was added before this bug was opened ;) This is a pretty important use-case for us. The topic we build the global store from is heterogeneous; we only need some types of records and need to rekey others prior to joining to our event stream. Personally, I feel like your second option (i.e. maintaining a real changelog topic) is probably overkill and simply running the records through the processors during restoration is adequate. Seems like we could even keep the current "fast path" restoration that bypasses serde when there is no custom processor configured. With respect to your recommended work-around, the problem with that is that if I'm not mistaken you lose the critical property of the global store being fully populated when the rest of the topology starts up. Because although the store will wait for "my-global-changelog" topic to be caught up, that doesn't mean much since that topic is itself fed from the main topology. > Custom Processor supplied on addGlobalStore is not used when restoring state > from topic > --- > > Key: KAFKA-7663 > URL: https://issues.apache.org/jira/browse/KAFKA-7663 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Frederic Tardif >Priority: Major > Attachments: image-2018-11-20-11-42-14-697.png > > > I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom > processor responsible to transform a K,V record from the input stream into a > V,K records. It works fine and my {{store.all()}} does print the correct > persisted V,K records. However, if I clean the local store and restart the > stream app, the global table is reloaded but without going through the > processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} > which simply stores the input topic K,V records into rocksDB (hence bypassing > the mapping function of my custom processor). I believe this must not be the > expected result? > This is a follow up on stackoverflow discussion around storing a K,V topic > as a global table with some stateless transformations based on a "custom" > processor added on the global store: > [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729] > If we address this issue, we should also apply > `default.deserialization.exception.handler` during restore (cf. KAFKA-8037) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica
[ https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983768#comment-16983768 ] Vijay commented on KAFKA-9087: -- Is the above issue resolved? Am also facing same problem. [2019-11-27 17:07:24,201] INFO [Partition -1 broker=3] XXX-1 starts at Leader Epoch 103 from offset 10681297828. Previous Leader Epoch was: 102 (kafka.cluster.Partition) [2019-11-27 17:07:24,207] INFO [ReplicaAlterLogDirsThread-1]: Partition -1 has an older epoch (102) than the current leader. Will await the new LeaderAndIsr state before resuming fetching. (kafka.server.ReplicaAlterLogDirsThread) > ReplicaAlterLogDirs stuck and restart fails with > java.lang.IllegalStateException: Offset mismatch for the future replica > > > Key: KAFKA-9087 > URL: https://issues.apache.org/jira/browse/KAFKA-9087 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0 >Reporter: Gregory Koshelev >Priority: Major > > I've started multiple replica movements between log directories and some > partitions were stuck. After the restart of the broker I've got exception in > server.log: > {noformat} > [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to > (kafka.server.ReplicaAlterLogDirsThread) > org.apache.kafka.common.KafkaException: Error processing data for partition > metrics_timers-35 offset 4224887 > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: java.lang.IllegalStateException: Offset mismatch for the future > replica metrics_timers-35: fetched offset = 4224887, log end offset = 0. > at > kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311) > ... 16 more > [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped > (kafka.server.ReplicaAlterLogDirsThread) > {noformat} > Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix > the problem. To fix it I've stopped the broker and remove all the stuck > future partitions. > Detailed log below > {noformat} > [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest > offset in the log is 4224886 (kafka.log.Log) > [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Loading producer state till offset 4224887 with > message format version 2 (kafka.log.Log) > [2019-06-11 12:21:34,980] INFO [ProducerStateManager > partition=metrics_timers-35] Loading producer state from snapshot file > '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' > (kafka.log.ProducerStateManager) > [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Completed load of log with 1 segments, log start > offset 4120720 and log end offset 4224887 in 70 ms (kafka.log.Log) > [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 > with initial high watermark 0 (kafka.cluster.Replica) > [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 > with initial high watermark 0 (kafka.cluster.Replica) > [2019-06-11 12:21:45,307] INFO Replica loaded for
[jira] [Created] (KAFKA-9242) Document MirrorMaker2 usage and use cases
Mickael Maison created KAFKA-9242: - Summary: Document MirrorMaker2 usage and use cases Key: KAFKA-9242 URL: https://issues.apache.org/jira/browse/KAFKA-9242 Project: Kafka Issue Type: Improvement Reporter: Mickael Maison The documentation about cluster mirroring/geo-replication is pretty poor. Now with MirrorMaker 2, it could be improved a lot to mention how to handle active/passive, active/active, hub/spoke environments. Details about offset translation, failover should also be included -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64
[ https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983665#comment-16983665 ] Bill Bejeck commented on KAFKA-9225: I don't recall the conversation either, but it makes sense to wait until 3.0 to move to RocksDB v6+ > kafka fail to run on linux-aarch64 > -- > > Key: KAFKA-9225 > URL: https://issues.apache.org/jira/browse/KAFKA-9225 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: jiamei xie >Priority: Major > Labels: incompatible > Attachments: compat_report.html > > > *Steps to reproduce:* > 1. Download Kafka latest source code > 2. Build it with gradle > 3. Run > [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]] > when running bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with > the following error message > {code:java} > xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied > but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:24,278] ERROR stream-client > [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads > have died. The instance will be in error state and should be closed. > (org.apach e.kafka.streams.KafkaStreams) > Exception in thread > "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" > java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: > /tmp/librocksdbjni13777546368576524 84.so: > cannot open shared object file: No such file or directory (Possible cause: > can't load AMD 64-bit .so on a AARCH64-bit platform) > {code} > *Analyze:* > This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 > native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from > [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix > this problem. > Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 > and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to > 6.3.6 in upstream? Should there be any kind of tests to execute, please > kindly point me. Thanks a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9190) Server leaves connections with expired authentication sessions open
[ https://issues.apache.org/jira/browse/KAFKA-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-9190: - Affects Version/s: 2.2.0 2.3.0 2.2.1 > Server leaves connections with expired authentication sessions open > > > Key: KAFKA-9190 > URL: https://issues.apache.org/jira/browse/KAFKA-9190 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0, 2.3.0, 2.2.1 >Reporter: Jason Gustafson >Assignee: Ron Dagostino >Priority: Major > > SocketServer implements some logic to disconnect connections which have > expired authentication sessions. At the moment, we just call > `SelectionKey.cancel` in order to trigger this disconnect. I think the > expectation is that this causes the channel to be closed on the next `poll`, > but as far as I can tell, all it does is disassociate the selection key from > the selector. This means that the key never gets selected again and we never > close the connection until the client times out. > This was found when debugging the flaky test failure > `EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl`. > I modified the code to call `Selector.close` instead of > `TransportLayer.disconnect`. I was able to reproduce the session > authentication expiration, but the connection properly closes and the test > does no longer times out. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9241) SASL Clients are not forced to re-authenticate if they don't leverage SaslAuthenticateRequest
Ron Dagostino created KAFKA-9241: Summary: SASL Clients are not forced to re-authenticate if they don't leverage SaslAuthenticateRequest Key: KAFKA-9241 URL: https://issues.apache.org/jira/browse/KAFKA-9241 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.2.1, 2.3.0, 2.2.0 Reporter: Ron Dagostino Assignee: Ron Dagostino Brokers are supposed to force SASL clients to re-authenticate (and kill such connections in the absence of a timely and successful re-authentication) when SASL Re-Authentication [(KIP-368)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate] is enabled via a positive `connections.max.reauth.ms` configuration value. There is a flaw in the logic that causes connections to not be killed in the absence of a timely and successful re-authentication _if the client does not leverage the SaslAuthenticateRequest API_ (which was defined in [KIP-152|https://cwiki.apache.org/confluence/display/KAFKA/KIP-152+-+Improve+diagnostics+for+SASL+authentication+failures]). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983584#comment-16983584 ] Raman Gupta commented on KAFKA-8803: [~bchen225242] actually only one eos app is affected (but seemingly random ones, not the same one every time). The rest of the EOS and non-EOS apps are fine. > Stream will not start due to TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > > > Key: KAFKA-8803 > URL: https://issues.apache.org/jira/browse/KAFKA-8803 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Raman Gupta >Assignee: Boyang Chen >Priority: Major > Attachments: logs.txt.gz, screenshot-1.png > > > One streams app is consistently failing at startup with the following > exception: > {code} > 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] > org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout > exception caught when initializing transactions for task 0_36. This might > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout. > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > {code} > These same brokers are used by many other streams without any issue, > including some in the very same processes for the stream which consistently > throws this exception. > *UPDATE 08/16:* > The very first instance of this error is August 13th 2019, 17:03:36.754 and > it happened for 4 different streams. For 3 of these streams, the error only > happened once, and then the stream recovered. For the 4th stream, the error > has continued to happen, and continues to happen now. > I looked up the broker logs for this time, and see that at August 13th 2019, > 16:47:43, two of four brokers started reporting messages like this, for > multiple partitions: > [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader > reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread) > The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, > here is a view of the count of these messages over time: > !screenshot-1.png! > However, as noted, the stream task timeout error continues to happen. > I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 > broker. The broker has a patch for KAFKA-8773. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9240) Flaky test kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
[ https://issues.apache.org/jira/browse/KAFKA-9240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-9240: - Labels: flaky-test (was: ) > Flaky test > kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker > -- > > Key: KAFKA-9240 > URL: https://issues.apache.org/jira/browse/KAFKA-9240 > Project: Kafka > Issue Type: Bug >Reporter: Levani Kokhreidze >Priority: Major > Labels: flaky-test > > {code:java} > Error Messageorg.scalatest.exceptions.TestFailedException: Partition should > have been moved to the expected log > directoryStacktraceorg.scalatest.exceptions.TestFailedException: Partition > should have been moved to the expected log directory at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker(ReassignPartitionsClusterTest.scala:176) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at > org.junit.runners.ParentRunner.run(ParentRunner.java:412) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unkn
[jira] [Commented] (KAFKA-9240) Flaky test kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
[ https://issues.apache.org/jira/browse/KAFKA-9240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983577#comment-16983577 ] Levani Kokhreidze commented on KAFKA-9240: -- [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9621/testReport/junit/kafka.admin/ReassignPartitionsClusterTest/shouldMoveSinglePartitionWithinBroker/] > Flaky test > kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker > -- > > Key: KAFKA-9240 > URL: https://issues.apache.org/jira/browse/KAFKA-9240 > Project: Kafka > Issue Type: Bug >Reporter: Levani Kokhreidze >Priority: Major > > {code:java} > Error Messageorg.scalatest.exceptions.TestFailedException: Partition should > have been moved to the expected log > directoryStacktraceorg.scalatest.exceptions.TestFailedException: Partition > should have been moved to the expected log directory at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker(ReassignPartitionsClusterTest.scala:176) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at > org.junit.runners.ParentRunner.run(ParentRunner.java:412) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at > org.g
[jira] [Created] (KAFKA-9240) Flaky test kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
Levani Kokhreidze created KAFKA-9240: Summary: Flaky test kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker Key: KAFKA-9240 URL: https://issues.apache.org/jira/browse/KAFKA-9240 Project: Kafka Issue Type: Bug Reporter: Levani Kokhreidze {code:java} Error Messageorg.scalatest.exceptions.TestFailedException: Partition should have been moved to the expected log directoryStacktraceorg.scalatest.exceptions.TestFailedException: Partition should have been moved to the expected log directory at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker(ReassignPartitionsClusterTest.scala:176) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$D
[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983574#comment-16983574 ] Levani Kokhreidze commented on KAFKA-9013: -- [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26856/console] > Flaky Test MirrorConnectorsIntegrationTest#testReplication > -- > > Key: KAFKA-9013 > URL: https://issues.apache.org/jira/browse/KAFKA-9013 > Project: Kafka > Issue Type: Bug >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {code:java} > java.lang.AssertionError: Condition not met within timeout 2. Offsets not > translated downstream to primary cluster. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239) > {code} > h1. Standard Error > {code} > Standard Error > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.
[jira] [Commented] (KAFKA-8773) Static membership protocol borks on re-used group id
[ https://issues.apache.org/jira/browse/KAFKA-8773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983506#comment-16983506 ] Boyang Chen commented on KAFKA-8773: Hey Raman, wondering whether you got a fix for this issue already? > Static membership protocol borks on re-used group id > > > Key: KAFKA-8773 > URL: https://issues.apache.org/jira/browse/KAFKA-8773 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Raman Gupta >Priority: Major > > I am using the new static group membership protocol in 2.3.0. I have a > situation in which an application defines multiple consumers, lets call them: > consumer-1 > consumer-2 > Each consumer uses the same group id "x", as they all belong to the same > application "x". With dynamic group membership, this is no problem at all. > However, with static membership starting a single instance of this > application (and therefore both consumers have the same instance.id) fails > consistently with errors like: > {code:java} > 2019-08-08 16:56:47,223 ERROR — org.apa.kaf.cli.con.int.AbstractCoordinator : > [Consumer instanceId=x-1, clientId=consumer-2, groupId=x] Received fatal > exception: group.instance.id gets fenced > 2019-08-08 16:56:47,229 ERROR — org.apa.kaf.cli.con.int.AbstractCoordinator : > [Consumer instanceId=x-1, clientId=consumer-1, groupId=x] Received fatal > exception: group.instance.id gets fenced > 2019-08-08 16:56:47,234 ERROR > ---red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling > thread. Will die for safety. [[EXCEPTION: > org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected > this static consumer since another consumer with the same group.instance.id > has registered with a different member.id. > ]] > 2019-08-08 16:56:47,229 ERROR — > red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling > thread. Will die for safety. [[EXCEPTION: > org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected > this static consumer since another consumer with the same group.instance.id > has registered with a different member.id. > ]]{code} > and to top it off, I also get this obviously incorrect error: > {code:java} > 2019-08-08 16:56:47,235 ERROR — > red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling > thread. Will die for safety. [[EXCEPTION: > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'version': java.nio.BufferUnderflowException > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > ~[kafka-clients-2.3.0.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > ~[kafka-clients-2.3.0.jar:?] > at > com.redock.microservice.kafka.BasicCommitAfterProcessingConsumer.run(BasicCommitAfterProcessingConsumer.kt:51) > ~[classes/:?] > at > com.redock.microservice.kafka.AbstractKafkaAutoCommitConsumerService$start$2.invokeSuspend(AbstractKafkaAutoCommitConsumerService.kt:44) > [classes/:?] > ... suppressed 2 lines > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > [?:?] > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > [?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > [?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > [?:?] > at java.lang.Thread.run(Thread.java:834) [?:?] > ]]{code} > > The broker logs contain this error: > {code:java} > ERROR given member.id x-1-1565298855983 is identified as a know
[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983504#comment-16983504 ] Boyang Chen commented on KAFKA-8803: [~rocketraman] Hey Raman, I'm currently investigating this issue. Just to confirm, all the EOS apps are affected but non-EOS ones are all doing fine during this time? > Stream will not start due to TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > > > Key: KAFKA-8803 > URL: https://issues.apache.org/jira/browse/KAFKA-8803 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Raman Gupta >Assignee: Boyang Chen >Priority: Major > Attachments: logs.txt.gz, screenshot-1.png > > > One streams app is consistently failing at startup with the following > exception: > {code} > 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] > org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout > exception caught when initializing transactions for task 0_36. This might > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout. > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > {code} > These same brokers are used by many other streams without any issue, > including some in the very same processes for the stream which consistently > throws this exception. > *UPDATE 08/16:* > The very first instance of this error is August 13th 2019, 17:03:36.754 and > it happened for 4 different streams. For 3 of these streams, the error only > happened once, and then the stream recovered. For the 4th stream, the error > has continued to happen, and continues to happen now. > I looked up the broker logs for this time, and see that at August 13th 2019, > 16:47:43, two of four brokers started reporting messages like this, for > multiple partitions: > [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader > reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread) > The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, > here is a view of the count of these messages over time: > !screenshot-1.png! > However, as noted, the stream task timeout error continues to happen. > I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 > broker. The broker has a patch for KAFKA-8773. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9239) Extreme amounts of logging done by unauthorized Kafka clients
[ https://issues.apache.org/jira/browse/KAFKA-9239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anders Eknert updated KAFKA-9239: - Description: Having experimented some with custom authorization options for Kafka on the broker side, we have a bunch of clients that are no longer authorized. While that's expected and fine, we did not anticipate the level of logging that these unauthorized clients would spew out - putting our whole logging subsystem under heavy stress. The message log is similar to the one below: {code:java} 2019-11-25 10:08:10.262 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=sdp-ee-miami-0, groupId=sdp-ee-miami] Not authorized to read from topic sdp.ee-miami. {code} In just 4 hours this same message was repeated about a hundred million times ( ! ) in the worst offending client, 74 million times in the next one and 72 million times in the third. We will roll out customized burst filters to suppress this on the client loggers, but it would of course be best if this was fixed in the client. was: Having experimented some with custom authorization options for Kafka on the broker side, we have a bunch of clients that are no longer authorized. While that's expected and fine, we did not anticipate the level of logging that these unauthorized clients would spew out - putting our whole logging subsystem under heavy stress. The message log is similar to the one below: {code:java} 2019-11-25 10:08:10.262 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=sdp-ee-miami-0, groupId=sdp-ee-miami] Not authorized to read from topic sdp.ee-miami. {code} In just 4 hours this same message was repeated about a hundred million times(!) in the worst offending client, 74 million times in the next one and 72 million times in the third. We will roll out customized burst filters to suppress this on the client loggers, but it would of course be best if this was fixed in the client. > Extreme amounts of logging done by unauthorized Kafka clients > - > > Key: KAFKA-9239 > URL: https://issues.apache.org/jira/browse/KAFKA-9239 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Anders Eknert >Priority: Major > Attachments: Screenshot 2019-11-27 at 11.32.38.png > > > Having experimented some with custom authorization options for Kafka on the > broker side, we have a bunch of clients that are no longer authorized. While > that's expected and fine, we did not anticipate the level of logging that > these unauthorized clients would spew out - putting our whole logging > subsystem under heavy stress. > The message log is similar to the one below: > {code:java} > 2019-11-25 10:08:10.262 WARN 1 --- [ntainer#0-0-C-1] > o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=sdp-ee-miami-0, > groupId=sdp-ee-miami] Not authorized to read from topic sdp.ee-miami. > {code} > In just 4 hours this same message was repeated about a hundred million times > ( ! ) in the worst offending client, 74 million times in the next one and 72 > million times in the third. > We will roll out customized burst filters to suppress this on the client > loggers, but it would of course be best if this was fixed in the client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9239) Extreme amounts of logging done by unauthorized Kafka clients
Anders Eknert created KAFKA-9239: Summary: Extreme amounts of logging done by unauthorized Kafka clients Key: KAFKA-9239 URL: https://issues.apache.org/jira/browse/KAFKA-9239 Project: Kafka Issue Type: Bug Components: clients Reporter: Anders Eknert Attachments: Screenshot 2019-11-27 at 11.32.38.png Having experimented some with custom authorization options for Kafka on the broker side, we have a bunch of clients that are no longer authorized. While that's expected and fine, we did not anticipate the level of logging that these unauthorized clients would spew out - putting our whole logging subsystem under heavy stress. The message log is similar to the one below: {code:java} 2019-11-25 10:08:10.262 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=sdp-ee-miami-0, groupId=sdp-ee-miami] Not authorized to read from topic sdp.ee-miami. {code} In just 4 hours this same message was repeated about a hundred million times(!) in the worst offending client, 74 million times in the next one and 72 million times in the third. We will roll out customized burst filters to suppress this on the client loggers, but it would of course be best if this was fixed in the client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9226) Section on deletion of segment files is out of date
[ https://issues.apache.org/jira/browse/KAFKA-9226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-9226. --- Fix Version/s: 2.5.0 Assignee: Sönke Liebau Resolution: Fixed > Section on deletion of segment files is out of date > --- > > Key: KAFKA-9226 > URL: https://issues.apache.org/jira/browse/KAFKA-9226 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 2.3.0 >Reporter: Sönke Liebau >Assignee: Sönke Liebau >Priority: Trivial > Fix For: 2.5.0 > > > The section on segment deletion in the documentation seems to be a bit out of > date. > https://kafka.apache.org/documentation/#impl_deletes > I noticed: > * pluggable deletion policies - can't find those > * deletion of segment by file access time - that's changed to record timestamp > * future mentions of size based cleanup policies - those have been implemented -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9226) Section on deletion of segment files is out of date
[ https://issues.apache.org/jira/browse/KAFKA-9226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983460#comment-16983460 ] ASF GitHub Bot commented on KAFKA-9226: --- mimaison commented on pull request #7738: KAFKA-9226: Updated documentation section on log deletion policies URL: https://github.com/apache/kafka/pull/7738 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Section on deletion of segment files is out of date > --- > > Key: KAFKA-9226 > URL: https://issues.apache.org/jira/browse/KAFKA-9226 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 2.3.0 >Reporter: Sönke Liebau >Priority: Trivial > > The section on segment deletion in the documentation seems to be a bit out of > date. > https://kafka.apache.org/documentation/#impl_deletes > I noticed: > * pluggable deletion policies - can't find those > * deletion of segment by file access time - that's changed to record timestamp > * future mentions of size based cleanup policies - those have been implemented -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9219) NullPointerException when polling metrics from Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-9219. --- Fix Version/s: 2.5.0 Resolution: Fixed > NullPointerException when polling metrics from Kafka Connect > > > Key: KAFKA-9219 > URL: https://issues.apache.org/jira/browse/KAFKA-9219 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 2.5.0 > > > The following stack trace appears: > > {code:java} > [2019-11-05 23:56:57,909] WARN Error getting JMX attribute 'assigned-tasks' > (org.apache.kafka.common.metrics.JmxReporter:202) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.distributed.WorkerCoordinator$WorkerCoordinatorMetrics$2.measure(WorkerCoordinator.java:316) > at > org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:66) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:190) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:200) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > at > javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1449) > at > javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) > at > javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) > at > javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) > at > javax.management.remote.rmi.RMIConnectionImpl.getAttributes(RMIConnectionImpl.java:675) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) > at sun.rmi.transport.Transport$1.run(Transport.java:200) > at sun.rmi.transport.Transport$1.run(Transport.java:197) > at java.security.AccessController.doPrivileged(Native Method) > at sun.rmi.transport.Transport.serviceCall(Transport.java:196) > at > sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:835) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688) > at java.security.AccessController.doPrivileged(Native Method) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-1, > groupId=backup-mm2] Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopping > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:609) > [2019-11-05 23:57:07,822] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:07,822] INFO Kafka MirrorMaker stopped. > (org.apache.kafka.connect.mirror.MirrorMaker:191) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9219) NullPointerException when polling metrics from Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983449#comment-16983449 ] ASF GitHub Bot commented on KAFKA-9219: --- mimaison commented on pull request #7652: KAFKA-9219: prevent NullPointerException when polling metrics from Kafka Connect URL: https://github.com/apache/kafka/pull/7652 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NullPointerException when polling metrics from Kafka Connect > > > Key: KAFKA-9219 > URL: https://issues.apache.org/jira/browse/KAFKA-9219 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Mickael Maison >Priority: Major > > The following stack trace appears: > > {code:java} > [2019-11-05 23:56:57,909] WARN Error getting JMX attribute 'assigned-tasks' > (org.apache.kafka.common.metrics.JmxReporter:202) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.distributed.WorkerCoordinator$WorkerCoordinatorMetrics$2.measure(WorkerCoordinator.java:316) > at > org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:66) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:190) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:200) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > at > javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1449) > at > javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) > at > javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) > at > javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) > at > javax.management.remote.rmi.RMIConnectionImpl.getAttributes(RMIConnectionImpl.java:675) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) > at sun.rmi.transport.Transport$1.run(Transport.java:200) > at sun.rmi.transport.Transport$1.run(Transport.java:197) > at java.security.AccessController.doPrivileged(Native Method) > at sun.rmi.transport.Transport.serviceCall(Transport.java:196) > at > sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:835) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688) > at java.security.AccessController.doPrivileged(Native Method) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-1, > groupId=backup-mm2] Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopping > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:609) > [2019-11-05 23:57:07,822] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:07,822] INFO Kafka MirrorMaker stopped. > (org.apache.kafka.connect.mirror.MirrorMaker:191) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64
[ https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983311#comment-16983311 ] Bruno Cadonna commented on KAFKA-9225: -- I also do not remember that we agreed on waiting for 3.0. > kafka fail to run on linux-aarch64 > -- > > Key: KAFKA-9225 > URL: https://issues.apache.org/jira/browse/KAFKA-9225 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: jiamei xie >Priority: Major > Labels: incompatible > Attachments: compat_report.html > > > *Steps to reproduce:* > 1. Download Kafka latest source code > 2. Build it with gradle > 3. Run > [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]] > when running bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with > the following error message > {code:java} > xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied > but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:24,278] ERROR stream-client > [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads > have died. The instance will be in error state and should be closed. > (org.apach e.kafka.streams.KafkaStreams) > Exception in thread > "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" > java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: > /tmp/librocksdbjni13777546368576524 84.so: > cannot open shared object file: No such file or directory (Possible cause: > can't load AMD 64-bit .so on a AARCH64-bit platform) > {code} > *Analyze:* > This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 > native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from > [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix > this problem. > Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 > and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to > 6.3.6 in upstream? Should there be any kind of tests to execute, please > kindly point me. Thanks a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)