[jira] [Commented] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo
[ https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17768739#comment-17768739 ] Hao Li commented on KAFKA-15491: Fixing it in https://github.com/apache/kafka/pull/14415. > RackId doesn't exist error while running WordCountDemo > -- > > Key: KAFKA-15491 > URL: https://issues.apache.org/jira/browse/KAFKA-15491 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Luke Chen >Priority: Major > > While running the WordCountDemo following the > [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the > following error logs in the stream application output. Though everything > still works fine, it'd be better there are no ERROR logs in the demo app. > {code:java} > [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process > e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer > streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8 > > (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor) > [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process > e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer > streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8 > > (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology
[ https://issues.apache.org/jira/browse/KAFKA-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15026: -- Assignee: Hao Li > Implement min-cost flow balancing tasks for same subtopology > > > Key: KAFKA-15026 > URL: https://issues.apache.org/jira/browse/KAFKA-15026 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15054) Add configs and logic to decide if rack aware assignment should be enabled
[ https://issues.apache.org/jira/browse/KAFKA-15054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15054: -- Assignee: Hao Li > Add configs and logic to decide if rack aware assignment should be enabled > -- > > Key: KAFKA-15054 > URL: https://issues.apache.org/jira/browse/KAFKA-15054 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15024) Add cost function for task/client
[ https://issues.apache.org/jira/browse/KAFKA-15024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15024: -- Assignee: Hao Li > Add cost function for task/client > - > > Key: KAFKA-15024 > URL: https://issues.apache.org/jira/browse/KAFKA-15024 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15025) Implement min-cost flow without balancing tasks for same subtopology
[ https://issues.apache.org/jira/browse/KAFKA-15025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-15025. -- > Implement min-cost flow without balancing tasks for same subtopology > > > Key: KAFKA-15025 > URL: https://issues.apache.org/jira/browse/KAFKA-15025 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15025) Implement min-cost flow without balancing tasks for same subtopology
[ https://issues.apache.org/jira/browse/KAFKA-15025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-15025. Fix Version/s: 3.6.0 Resolution: Fixed > Implement min-cost flow without balancing tasks for same subtopology > > > Key: KAFKA-15025 > URL: https://issues.apache.org/jira/browse/KAFKA-15025 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15054) Add configs and logic to decide if rack aware assignment should be enabled
[ https://issues.apache.org/jira/browse/KAFKA-15054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-15054. -- > Add configs and logic to decide if rack aware assignment should be enabled > -- > > Key: KAFKA-15054 > URL: https://issues.apache.org/jira/browse/KAFKA-15054 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15054) Add configs and logic to decide if rack aware assignment should be enabled
[ https://issues.apache.org/jira/browse/KAFKA-15054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-15054. Fix Version/s: 3.6.0 Resolution: Fixed > Add configs and logic to decide if rack aware assignment should be enabled > -- > > Key: KAFKA-15054 > URL: https://issues.apache.org/jira/browse/KAFKA-15054 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15025) Implement min-cost flow without balancing tasks for same subtopology
[ https://issues.apache.org/jira/browse/KAFKA-15025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15025: -- Assignee: Hao Li > Implement min-cost flow without balancing tasks for same subtopology > > > Key: KAFKA-15025 > URL: https://issues.apache.org/jira/browse/KAFKA-15025 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15024) Add cost function for task/client
[ https://issues.apache.org/jira/browse/KAFKA-15024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-15024. -- > Add cost function for task/client > - > > Key: KAFKA-15024 > URL: https://issues.apache.org/jira/browse/KAFKA-15024 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15024) Add cost function for task/client
[ https://issues.apache.org/jira/browse/KAFKA-15024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-15024. Fix Version/s: 3.6.0 Resolution: Fixed > Add cost function for task/client > - > > Key: KAFKA-15024 > URL: https://issues.apache.org/jira/browse/KAFKA-15024 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15027) Implement rack aware assignment for standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-15027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-15027. -- > Implement rack aware assignment for standby tasks > - > > Key: KAFKA-15027 > URL: https://issues.apache.org/jira/browse/KAFKA-15027 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15027) Implement rack aware assignment for standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-15027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-15027. Fix Version/s: 3.6.0 Resolution: Fixed > Implement rack aware assignment for standby tasks > - > > Key: KAFKA-15027 > URL: https://issues.apache.org/jira/browse/KAFKA-15027 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15027) Implement rack aware assignment for standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-15027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15027: -- Assignee: Hao Li > Implement rack aware assignment for standby tasks > - > > Key: KAFKA-15027 > URL: https://issues.apache.org/jira/browse/KAFKA-15027 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15023) Get rack information for source topic partitions for a task
[ https://issues.apache.org/jira/browse/KAFKA-15023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-15023. Fix Version/s: 3.6.0 Resolution: Fixed > Get rack information for source topic partitions for a task > --- > > Key: KAFKA-15023 > URL: https://issues.apache.org/jira/browse/KAFKA-15023 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15023) Get rack information for source topic partitions for a task
[ https://issues.apache.org/jira/browse/KAFKA-15023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-15023. -- > Get rack information for source topic partitions for a task > --- > > Key: KAFKA-15023 > URL: https://issues.apache.org/jira/browse/KAFKA-15023 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15023) Get rack information for source topic partitions for a task
[ https://issues.apache.org/jira/browse/KAFKA-15023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15023: -- Assignee: Hao Li > Get rack information for source topic partitions for a task > --- > > Key: KAFKA-15023 > URL: https://issues.apache.org/jira/browse/KAFKA-15023 > Project: Kafka > Issue Type: Sub-task >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15054) Add configs and logic to decide if rack aware assignment should be enabled
Hao Li created KAFKA-15054: -- Summary: Add configs and logic to decide if rack aware assignment should be enabled Key: KAFKA-15054 URL: https://issues.apache.org/jira/browse/KAFKA-15054 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15025) Implement min-cost flow without balancing tasks for same subtopology
Hao Li created KAFKA-15025: -- Summary: Implement min-cost flow without balancing tasks for same subtopology Key: KAFKA-15025 URL: https://issues.apache.org/jira/browse/KAFKA-15025 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15027) Implement rack aware assignment for standby tasks
Hao Li created KAFKA-15027: -- Summary: Implement rack aware assignment for standby tasks Key: KAFKA-15027 URL: https://issues.apache.org/jira/browse/KAFKA-15027 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology
Hao Li created KAFKA-15026: -- Summary: Implement min-cost flow balancing tasks for same subtopology Key: KAFKA-15026 URL: https://issues.apache.org/jira/browse/KAFKA-15026 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15024) Add cost function for task/client
Hao Li created KAFKA-15024: -- Summary: Add cost function for task/client Key: KAFKA-15024 URL: https://issues.apache.org/jira/browse/KAFKA-15024 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15023) Get rack information for source topic partitions for a task
Hao Li created KAFKA-15023: -- Summary: Get rack information for source topic partitions for a task Key: KAFKA-15023 URL: https://issues.apache.org/jira/browse/KAFKA-15023 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15022) Support rack aware task assignment in Kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li updated KAFKA-15022: --- Labels: kip kip-925 (was: ) > Support rack aware task assignment in Kafka streams > > > Key: KAFKA-15022 > URL: https://issues.apache.org/jira/browse/KAFKA-15022 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip, kip-925 > > For KIP-925: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15022) Support rack aware task assignment in Kafka streams
Hao Li created KAFKA-15022: -- Summary: Support rack aware task assignment in Kafka streams Key: KAFKA-15022 URL: https://issues.apache.org/jira/browse/KAFKA-15022 Project: Kafka Issue Type: Improvement Reporter: Hao Li For KIP-925: https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15022) Support rack aware task assignment in Kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li updated KAFKA-15022: --- Component/s: streams > Support rack aware task assignment in Kafka streams > > > Key: KAFKA-15022 > URL: https://issues.apache.org/jira/browse/KAFKA-15022 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > For KIP-925: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15022) Support rack aware task assignment in Kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15022: -- Assignee: Hao Li > Support rack aware task assignment in Kafka streams > > > Key: KAFKA-15022 > URL: https://issues.apache.org/jira/browse/KAFKA-15022 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > For KIP-925: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted
Hao Li created KAFKA-14981: -- Summary: Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted Key: KAFKA-14981 URL: https://issues.apache.org/jira/browse/KAFKA-14981 Project: Kafka Issue Type: Improvement Components: streams Reporter: Hao Li `group.instance.id` enables static membership so that if a consumer is restarted within `session.timeout.ms`, rebalance will not be triggered and originally assignment can be returned directly from broker. We can set this id in Kafka streams using `threadId` so that no rebalance is trigger within `session.timeout.ms` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14395) Add config to configure client supplier for KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-14395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-14395. Resolution: Done > Add config to configure client supplier for KafkaStreams > > > Key: KAFKA-14395 > URL: https://issues.apache.org/jira/browse/KAFKA-14395 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip > > For KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14395) Add config to configure client supplier for KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-14395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-14395: -- Assignee: Hao Li > Add config to configure client supplier for KafkaStreams > > > Key: KAFKA-14395 > URL: https://issues.apache.org/jira/browse/KAFKA-14395 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip > > For KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13785) Support emit final result for windowed aggregation
[ https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-13785. Resolution: Done > Support emit final result for windowed aggregation > -- > > Key: KAFKA-13785 > URL: https://issues.apache.org/jira/browse/KAFKA-13785 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip > > For KIP-825: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-13785) Support emit final result for windowed aggregation
[ https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-13785. -- > Support emit final result for windowed aggregation > -- > > Key: KAFKA-13785 > URL: https://issues.apache.org/jira/browse/KAFKA-13785 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip > > For KIP-825: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14395) Add config to configure client supplier for KafkaStreams
Hao Li created KAFKA-14395: -- Summary: Add config to configure client supplier for KafkaStreams Key: KAFKA-14395 URL: https://issues.apache.org/jira/browse/KAFKA-14395 Project: Kafka Issue Type: Improvement Components: streams Reporter: Hao Li For KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14117) Flaky Test DynamicBrokerReconfigurationTest.testKeyStoreAlter
Hao Li created KAFKA-14117: -- Summary: Flaky Test DynamicBrokerReconfigurationTest.testKeyStoreAlter Key: KAFKA-14117 URL: https://issues.apache.org/jira/browse/KAFKA-14117 Project: Kafka Issue Type: Test Components: unit tests Reporter: Hao Li This is a flaky test. Log: {code:java} [2022-07-27T11:44:23.102Z] DynamicBrokerReconfigurationTest > testKeyStoreAlter() FAILED [2022-07-27T11:44:23.102Z] org.opentest4j.AssertionFailedError: Duplicates not expected ==> expected: but was: [2022-07-27T11:44:23.102Z] at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) [2022-07-27T11:44:23.102Z] at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40) [2022-07-27T11:44:23.103Z] at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:235) [2022-07-27T11:44:23.103Z] at kafka.server.DynamicBrokerReconfigurationTest.stopAndVerifyProduceConsume(DynamicBrokerReconfigurationTest.scala:1579) [2022-07-27T11:44:23.103Z] at kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter(DynamicBrokerReconfigurationTest.scala:399){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reopened KAFKA-12566: Somehow this is still happening: ``` [2022-07-27T11:32:04.461Z] MirrorConnectorsIntegrationSSLTest > testReplication() FAILED [2022-07-27T11:32:04.461Z] org.opentest4j.AssertionFailedError: Condition not met within timeout 2. Offsets not translated downstream to primary cluster. ==> expected: but was: [2022-07-27T11:32:04.461Z] at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) [2022-07-27T11:32:04.461Z] at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) [2022-07-27T11:32:04.461Z] at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) [2022-07-27T11:32:04.461Z] at org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:334) [2022-07-27T11:32:04.462Z] at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:382) [2022-07-27T11:32:04.462Z] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:331) [2022-07-27T11:32:04.462Z] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:315) [2022-07-27T11:32:04.462Z] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:305) [2022-07-27T11:32:04.462Z] at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:319) [2022-07-27T11:32:04.462Z] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2022-07-27T11:32:04.462Z] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [2022-07-27T11:32:04.462Z] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [2022-07-27T11:32:04.462Z] at java.lang.reflect.Method.invoke(Method.java:498) [2022-07-27T11:32:04.462Z] at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) [2022-07-27T11:32:04.462Z] at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) [2022-07-27T11:32:04.462Z] at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) [2022-07-27T11:32:04.463Z] at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) [2022-07-27T11:32:04.464Z] at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) [2022-07-27T11:32:04.464Z] at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) [2022-07-27T11:32:04.464Z] at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) [2022-07-27T11:32:04.465Z] at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) [2022-07-27T11:32:04.465Z] at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) [2022-07-27T11:32:04.465Z] at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) [2022-07-27T11:32:04.465Z] at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) [2022-07-27T11:32:04.465Z] at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) [2022-07-27T11:32:04.465Z] at org.junit.platform.engine.support.hierarchical.Node.aroun
[jira] [Created] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed
Hao Li created KAFKA-13824: -- Summary: Pass time object from constructor so that we can mock it if needed Key: KAFKA-13824 URL: https://issues.apache.org/jira/browse/KAFKA-13824 Project: Kafka Issue Type: Improvement Reporter: Hao Li In [https://github.com/apache/kafka/pull/11896,] for streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once
Hao Li created KAFKA-13817: -- Summary: Schedule nextTimeToEmit to system time every time instead of just once Key: KAFKA-13817 URL: https://issues.apache.org/jira/browse/KAFKA-13817 Project: Kafka Issue Type: Improvement Reporter: Hao Li [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.] If this is just scheduled once, this can trigger emit every time if system time jumps a lot suddenly. For example, # nextTimeToEmit set to 1 and step is 1 # If next system time jumps to 100, we will always emit for next 100 records -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896
[ https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-13800: -- Assignee: Hao Li > Remove force cast of TimeWindowKStreamImpl in tests of > https://github.com/apache/kafka/pull/11896 > - > > Key: KAFKA-13800 > URL: https://issues.apache.org/jira/browse/KAFKA-13800 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > We can remove the cast after `emitStrategy` is added to public api -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896
Hao Li created KAFKA-13800: -- Summary: Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896 Key: KAFKA-13800 URL: https://issues.apache.org/jira/browse/KAFKA-13800 Project: Kafka Issue Type: Improvement Reporter: Hao Li We can remove the cast after `emitStrategy` is added to public api -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13785) Support emit final result for windowed aggregation
Hao Li created KAFKA-13785: -- Summary: Support emit final result for windowed aggregation Key: KAFKA-13785 URL: https://issues.apache.org/jira/browse/KAFKA-13785 Project: Kafka Issue Type: Improvement Reporter: Hao Li For KIP-825: https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13785) Support emit final result for windowed aggregation
[ https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-13785: -- Assignee: Hao Li > Support emit final result for windowed aggregation > -- > > Key: KAFKA-13785 > URL: https://issues.apache.org/jira/browse/KAFKA-13785 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > For KIP-825: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams
[ https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511960#comment-17511960 ] Hao Li commented on KAFKA-13542: Most likely I don't have bandwidth to investigate this before March 30th since this was original planned as a trivial work to fix but turn out to be more complex investigation of Kafka api. I agree `enforceRebalance` is not a blocker for 3.2 release since we can and still are using old api without reason. I can lower the priority from blocker. WHYT? > Utilize the new Consumer#enforceRebalance(reason) API in Streams > > > Key: KAFKA-13542 > URL: https://issues.apache.org/jira/browse/KAFKA-13542 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Hao Li >Priority: Blocker > Fix For: 3.2.0 > > > KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance > API, which will be passed in to a new field of the JoinGroup protocol. We > invoke this API throughout Streams for various reasons, which are very useful > for debugging the cause of rebalancing. Passing in the reason to this new API > would make it possible to figure out why a Streams client triggered a > rebalance from the broker logs, which are often the only logs available when > the client logs cannot be retrieved for whatever reason -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams
[ https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511577#comment-17511577 ] Hao Li commented on KAFKA-13542: [~mjsax] , [~cadonna] , I haven't got time to figure out why this caused perf regression and hence fix it again. When do we plan to release 3.2? > Utilize the new Consumer#enforceRebalance(reason) API in Streams > > > Key: KAFKA-13542 > URL: https://issues.apache.org/jira/browse/KAFKA-13542 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Hao Li >Priority: Blocker > Fix For: 3.2.0 > > > KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance > API, which will be passed in to a new field of the JoinGroup protocol. We > invoke this API throughout Streams for various reasons, which are very useful > for debugging the cause of rebalancing. Passing in the reason to this new API > would make it possible to figure out why a Streams client triggered a > rebalance from the broker logs, which are often the only logs available when > the client logs cannot be retrieved for whatever reason -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams
[ https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-13542: -- Assignee: Hao Li > Utilize the new Consumer#enforceRebalance(reason) API in Streams > > > Key: KAFKA-13542 > URL: https://issues.apache.org/jira/browse/KAFKA-13542 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Hao Li >Priority: Blocker > Fix For: 3.2.0 > > > KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance > API, which will be passed in to a new field of the JoinGroup protocol. We > invoke this API throughout Streams for various reasons, which are very useful > for debugging the cause of rebalancing. Passing in the reason to this new API > would make it possible to figure out why a Streams client triggered a > rebalance from the broker logs, which are often the only logs available when > the client logs cannot be retrieved for whatever reason -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-13164. -- > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-13164. -- > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-13164. Resolution: Cannot Reproduce Closing this as I can't reproduce the issue > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425673#comment-17425673 ] Hao Li commented on KAFKA-13164: Hi Ralph, Could you double check if map value join is working given my example PR? Any gaps between our examples? Thanks, Hao > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17424590#comment-17424590 ] Hao Li commented on KAFKA-13164: Hi Ralph, I actually can see output for both cases now (I was tailing wrong output topic before...). [https://github.com/lihaosky/kafka-streams-examples/pull/1] is the testing file. It has both map from table and then join as well as map from stream then toTable and then join. Please take a look. Thanks, Hao > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17424296#comment-17424296 ] Hao Li commented on KAFKA-13164: Hi Ralph, I was able to test it out with following code snippet: {code:java} static void createWordCountStream(final StreamsBuilder builder) { final KStream textLines = builder.stream(joinInputTopic); final KTable table = builder.table(inputTopic) .mapValues(v -> { System.out.println("Table got " + v); return v + " suffix"; }); final KStream output = textLines.leftJoin(table, (sv, tv) -> { System.out.println("Join function called for " + sv + ", " + tv); return sv + tv; }); output.to(outputTopic); } {code} I was able to see the join function got called with correct sv and tv. I also saw table map values function called before join function called. So as [~guozhang] mentioned, the state store is attached to source table and the map value function is called on the fly and then join happens. So that's expected. However, I wasn't able to see output in output topic. That requires more digging but state store attaching to source table seems to be expected. > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17423432#comment-17423432 ] Hao Li commented on KAFKA-13164: Hi [~xdgrulez], when you say "The join didn't work", what do you exactly mean? Is it not producing results, crashing, wrong results or other errors? > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-13164: -- Assignee: Hao Li > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 2.8.1, 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)