[jira] [Commented] (KAFKA-12549) Allow state stores to opt-in transactional support
[ https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812756#comment-17812756 ] highluck commented on KAFKA-12549: -- [~guozhang] Can I take the work with you? > Allow state stores to opt-in transactional support > -- > > Key: KAFKA-12549 > URL: https://issues.apache.org/jira/browse/KAFKA-12549 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Right now Kafka Stream's EOS implementation does not make any assumptions > about the state store's transactional support. Allowing the state stores to > optionally provide transactional support can have multiple benefits. E.g., if > we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, > {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are > supported via an additional {{boolean transactional()}} API, and if yes the > these APIs can be used under both ALOS and EOS like the following (otherwise > then just fallback to the normal processing logic): > Within thread processing loops: > 1. store.beginTxn > 2. store.put // during processing > 3. streams commit // either through eos protocol or not > 4. store.commitTxn > 5. start the next txn by store.beginTxn > If the state stores allow Streams to do something like above, we can have the > following benefits: > * Reduce the duplicated records upon crashes for ALOS (note this is not EOS > still, but some middle-ground where uncommitted data within a state store > would not be retained if store.commitTxn failed). > * No need to wipe the state store and re-bootstrap from scratch upon crashes > for EOS. E.g., if a crash-failure happened between streams commit completes > and store.commitTxn. We can instead just roll-forward the transaction by > replaying the changelog from the second recent streams committed offset > towards the most recent committed offset. > * Remote stores that support txn then do not need to support wiping > (https://issues.apache.org/jira/browse/KAFKA-12475). > * We can fix the known issues of emit-on-change > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams). > * We can support "query committed data only" for interactive queries (see > below for reasons). > As for the implementation of these APIs, there are several options: > * The state store itself have natural transaction features (e.g. RocksDB). > * Use an in-memory buffer for all puts within a transaction, and upon > `commitTxn` write the whole buffer as a batch to the underlying state store, > or just drop the whole buffer upon aborting. Then for interactive queries, > one can optionally only query the underlying store for committed data only. > * Use a separate store as the transient persistent buffer. Upon `beginTxn` > create a new empty transient store, and upon `commitTxn` merge the store into > the underlying store. Same applies for interactive querying committed-only > data. This has a benefit compared with the one above that there's no memory > pressure even with long transactions, but incurs more complexity / > performance overhead with the separate persistent store. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15744) KRaft support in CustomQuotaCallbackTest
[ https://issues.apache.org/jira/browse/KAFKA-15744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-15744: Assignee: highluck > KRaft support in CustomQuotaCallbackTest > > > Key: KAFKA-15744 > URL: https://issues.apache.org/jira/browse/KAFKA-15744 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: highluck >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in CustomQuotaCallbackTest in > core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala need > to be updated to support KRaft > 90 : def testCustomQuotaCallback(): Unit = { > Scanned 468 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15716) KRaft support in EpochDrivenReplicationProtocolAcceptanceTest
[ https://issues.apache.org/jira/browse/KAFKA-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-15716: Assignee: highluck > KRaft support in EpochDrivenReplicationProtocolAcceptanceTest > - > > Key: KAFKA-15716 > URL: https://issues.apache.org/jira/browse/KAFKA-15716 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: highluck >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in EpochDrivenReplicationProtocolAcceptanceTest in > core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala > need to be updated to support KRaft > 74 : def shouldFollowLeaderEpochBasicWorkflow(): Unit = { > 135 : def shouldNotAllowDivergentLogs(): Unit = { > 183 : def offsetsShouldNotGoBackwards(): Unit = { > 257 : def shouldSurviveFastLeaderChange(): Unit = { > 298 : def logsShouldNotDivergeOnUncleanLeaderElections(): Unit = { > Scanned 473 lines. Found 0 KRaft tests out of 5 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15715) KRaft support in UpdateFeaturesTest
[ https://issues.apache.org/jira/browse/KAFKA-15715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-15715: Assignee: (was: highluck) > KRaft support in UpdateFeaturesTest > --- > > Key: KAFKA-15715 > URL: https://issues.apache.org/jira/browse/KAFKA-15715 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in UpdateFeaturesTest in > core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala need to be > updated to support KRaft > 176 : def testShouldFailRequestIfNotController(): Unit = { > 210 : def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): > Unit = { > 223 : def > testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = { > 236 : def > testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion(): Unit > = { > 280 : def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = { > 292 : def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = { > 354 : def > testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityForExistingFinalizedFeature(): > Unit = { > 368 : def > testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityWithNoExistingFinalizedFeature(): > Unit = { > 381 : def testSuccessfulFeatureUpgradeAndWithNoExistingFinalizedFeatures(): > Unit = { > 417 : def testSuccessfulFeatureUpgradeAndDowngrade(): Unit = { > 459 : def testPartialSuccessDuringValidFeatureUpgradeAndInvalidDowngrade(): > Unit = { > 509 : def testPartialSuccessDuringInvalidFeatureUpgradeAndValidDowngrade(): > Unit = { > Scanned 567 lines. Found 0 KRaft tests out of 12 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15715) KRaft support in UpdateFeaturesTest
[ https://issues.apache.org/jira/browse/KAFKA-15715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-15715: Assignee: highluck > KRaft support in UpdateFeaturesTest > --- > > Key: KAFKA-15715 > URL: https://issues.apache.org/jira/browse/KAFKA-15715 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: highluck >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in UpdateFeaturesTest in > core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala need to be > updated to support KRaft > 176 : def testShouldFailRequestIfNotController(): Unit = { > 210 : def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): > Unit = { > 223 : def > testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = { > 236 : def > testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion(): Unit > = { > 280 : def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = { > 292 : def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = { > 354 : def > testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityForExistingFinalizedFeature(): > Unit = { > 368 : def > testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityWithNoExistingFinalizedFeature(): > Unit = { > 381 : def testSuccessfulFeatureUpgradeAndWithNoExistingFinalizedFeatures(): > Unit = { > 417 : def testSuccessfulFeatureUpgradeAndDowngrade(): Unit = { > 459 : def testPartialSuccessDuringValidFeatureUpgradeAndInvalidDowngrade(): > Unit = { > 509 : def testPartialSuccessDuringInvalidFeatureUpgradeAndValidDowngrade(): > Unit = { > Scanned 567 lines. Found 0 KRaft tests out of 12 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27
[ https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811467#comment-17811467 ] highluck commented on KAFKA-16066: -- thanks [~divijvaidya] :) > Upgrade apacheds to 2.0.0.AM27 > -- > > Key: KAFKA-16066 > URL: https://issues.apache.org/jira/browse/KAFKA-16066 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: highluck >Priority: Major > Labels: newbie, newbie++ > > We are currently using a very old dependency. Notably, apacheds is only used > for testing when we use MiniKdc, hence, there is nothing stopping us from > upgrading it. > Notably, apacheds has removed the component > org.apache.directory.server:apacheds-protocol-kerberos in favour of using > Apache Kerby, hence, we need to make changes in MiniKdc.scala for this > upgrade to work correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27
[ https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-16066: Assignee: highluck > Upgrade apacheds to 2.0.0.AM27 > -- > > Key: KAFKA-16066 > URL: https://issues.apache.org/jira/browse/KAFKA-16066 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: highluck >Priority: Major > Labels: newbie, newbie++ > > We are currently using a very old dependency. Notably, apacheds is only used > for testing when we use MiniKdc, hence, there is nothing stopping us from > upgrading it. > Notably, apacheds has removed the component > org.apache.directory.server:apacheds-protocol-kerberos in favour of using > Apache Kerby, hence, we need to make changes in MiniKdc.scala for this > upgrade to work correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27
[ https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809731#comment-17809731 ] highluck commented on KAFKA-16066: -- If there is no one else to do it, can I do it? > Upgrade apacheds to 2.0.0.AM27 > -- > > Key: KAFKA-16066 > URL: https://issues.apache.org/jira/browse/KAFKA-16066 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Major > Labels: newbie, newbie++ > > We are currently using a very old dependency. Notably, apacheds is only used > for testing when we use MiniKdc, hence, there is nothing stopping us from > upgrading it. > Notably, apacheds has removed the component > org.apache.directory.server:apacheds-protocol-kerberos in favour of using > Apache Kerby, hence, we need to make changes in MiniKdc.scala for this > upgrade to work correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking
[ https://issues.apache.org/jira/browse/KAFKA-16084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809693#comment-17809693 ] highluck commented on KAFKA-16084: -- [~gharris1727] Can I do it? > Simplify and deduplicate StandaloneHerderTest mocking > - > > Key: KAFKA-16084 > URL: https://issues.apache.org/jira/browse/KAFKA-16084 > Project: Kafka > Issue Type: Test > Components: connect >Reporter: Greg Harris >Priority: Minor > Labels: newbie++ > > The StandaloneHerderTest has some cruft that can be cleaned up. What i've > found: > * The `connector` field is written in nearly every test, but only read by one > test, and looks to be nearly irrelevant. > * `expectConfigValidation` has two ways of specifying consecutive > validations. 1. The boolean shouldCreateConnector which is true in the first > invocation and false in subsequent invocations. 2. by passing multiple > configurations via varargs. > * The class uses a mix of Mock annotations and mock(Class) invocations > * The test doesn't stop the thread pool created inside the herder and might > leak threads > * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times > throughout the test > * Some waits are 1000 ms and others are 1000 s, and could be pulled out to > constants or a util method -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
[ https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck resolved KAFKA-10283. -- Resolution: Fixed > Consolidate client-level and consumer-level assignment within ClientState > - > > Key: KAFKA-10283 > URL: https://issues.apache.org/jira/browse/KAFKA-10283 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > In StreamsPartitionAssignor, we do a two-level assignment, one on the > client-level, and then after the assignment is done we further decide within > the client how to distributed among consumers if there are more. > The {{ClientState}} class is used for book-keeping the assigned tasks, > however it is only used for the first level, while for the second level it is > done outside of the class and we only keep track of the results in a few maps > for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. > some on the client level and some on the consumer level. > We would like to consolidate some of these maps into a single data structure > for better keeping track of the assignment information, and also for less bug > vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308411#comment-17308411 ] highluck commented on KAFKA-10575: -- [~ableegoldman] Yes, I am willing to work. However, I am wondering what to do..! Would it be okay if I could tell you the direction you would like more about the ticket?! > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7785) Remove PartitionGrouper interface and it's config and move DefaultPartitionGrouper to internal package
[ https://issues.apache.org/jira/browse/KAFKA-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-7785: --- Assignee: highluck > Remove PartitionGrouper interface and it's config and move > DefaultPartitionGrouper to internal package > -- > > Key: KAFKA-7785 > URL: https://issues.apache.org/jira/browse/KAFKA-7785 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Assignee: highluck >Priority: Blocker > Fix For: 3.0.0 > > > Since {{DefaultPartitionGrouper}} is only for the purpose of the internal > {{StreamsPartitionAssignor}} it would make sense to have it in the > {{org.apache.kafka.streams.processor.internals}} package. > I would also vote to move {{PartitionGrouper.}} > Via KAFKA-8927 we deprecated the `PartitionGrouper` interface in 2.4 release > – this allows us to remove the public interface and its corresponding config > in the next major release (ie, 3.0.0). `DefaultPartitionGrouper` was > implicitly deprecated via KAFKA-8927. > Hence, we can move the interface as well as the default implementation into > an internal package (or maybe just remove the interface completely as there > are no plans to support multiple implementations atm). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7785) Remove PartitionGrouper interface and it's config and move DefaultPartitionGrouper to internal package
[ https://issues.apache.org/jira/browse/KAFKA-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17297768#comment-17297768 ] highluck commented on KAFKA-7785: - Oh thank [~mjsax] > Remove PartitionGrouper interface and it's config and move > DefaultPartitionGrouper to internal package > -- > > Key: KAFKA-7785 > URL: https://issues.apache.org/jira/browse/KAFKA-7785 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Priority: Blocker > Fix For: 3.0.0 > > > Since {{DefaultPartitionGrouper}} is only for the purpose of the internal > {{StreamsPartitionAssignor}} it would make sense to have it in the > {{org.apache.kafka.streams.processor.internals}} package. > I would also vote to move {{PartitionGrouper.}} > Via KAFKA-8927 we deprecated the `PartitionGrouper` interface in 2.4 release > – this allows us to remove the public interface and its corresponding config > in the next major release (ie, 3.0.0). `DefaultPartitionGrouper` was > implicitly deprecated via KAFKA-8927. > Hence, we can move the interface as well as the default implementation into > an internal package (or maybe just remove the interface completely as there > are no plans to support multiple implementations atm). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4928) Add integration test for DumpLogSegments
[ https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17262609#comment-17262609 ] highluck commented on KAFKA-4928: - [~ijuma] Is it okay for me to do this? thanks > Add integration test for DumpLogSegments > > > Key: KAFKA-4928 > URL: https://issues.apache.org/jira/browse/KAFKA-4928 > Project: Kafka > Issue Type: Test > Components: log, tools >Reporter: Ismael Juma >Priority: Major > Labels: newbie > > DumpLogSegments is an important tool to analyse log files, but we have no > JUnit tests for it. It would be good to have some tests that verify that the > output is sane for a populated log. > Our system tests call DumpLogSegments, but we should be able to detect > regressions via the JUnit test suite. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
[ https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17262605#comment-17262605 ] highluck commented on KAFKA-10283: -- [~ableegoldman] Please review once! Thanks! > Consolidate client-level and consumer-level assignment within ClientState > - > > Key: KAFKA-10283 > URL: https://issues.apache.org/jira/browse/KAFKA-10283 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > In StreamsPartitionAssignor, we do a two-level assignment, one on the > client-level, and then after the assignment is done we further decide within > the client how to distributed among consumers if there are more. > The {{ClientState}} class is used for book-keeping the assigned tasks, > however it is only used for the first level, while for the second level it is > done outside of the class and we only keep track of the results in a few maps > for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. > some on the client level and some on the consumer level. > We would like to consolidate some of these maps into a single data structure > for better keeping track of the assignment information, and also for less bug > vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10769) Remove JoinGroupRequest#containsValidPattern as it is duplicate to Topic#containsValidPattern
[ https://issues.apache.org/jira/browse/KAFKA-10769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-10769: Assignee: highluck (was: Arun Parthiban) > Remove JoinGroupRequest#containsValidPattern as it is duplicate to > Topic#containsValidPattern > - > > Key: KAFKA-10769 > URL: https://issues.apache.org/jira/browse/KAFKA-10769 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: highluck >Priority: Minor > Labels: newbie > > as title. Remove the redundant code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10769) Remove JoinGroupRequest#containsValidPattern as it is duplicate to Topic#containsValidPattern
[ https://issues.apache.org/jira/browse/KAFKA-10769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17257896#comment-17257896 ] highluck commented on KAFKA-10769: -- [~chia7712] thanks :) > Remove JoinGroupRequest#containsValidPattern as it is duplicate to > Topic#containsValidPattern > - > > Key: KAFKA-10769 > URL: https://issues.apache.org/jira/browse/KAFKA-10769 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Arun Parthiban >Priority: Minor > Labels: newbie > > as title. Remove the redundant code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-10575: Assignee: highluck > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245318#comment-17245318 ] highluck commented on KAFKA-10575: -- [~Yohan123] Are you working on this task? > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10769) Remove JoinGroupRequest#containsValidPattern as it is duplicate to Topic#containsValidPattern
[ https://issues.apache.org/jira/browse/KAFKA-10769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245313#comment-17245313 ] highluck commented on KAFKA-10769: -- [~parth017] Are you working on this task? > Remove JoinGroupRequest#containsValidPattern as it is duplicate to > Topic#containsValidPattern > - > > Key: KAFKA-10769 > URL: https://issues.apache.org/jira/browse/KAFKA-10769 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Arun Parthiban >Priority: Minor > Labels: newbie > > as title. Remove the redundant code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9259) suppress() for windowed-Serdes does not work with default serdes
[ https://issues.apache.org/jira/browse/KAFKA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245308#comment-17245308 ] highluck commented on KAFKA-9259: - [~omanges] Are you working on this task? > suppress() for windowed-Serdes does not work with default serdes > > > Key: KAFKA-9259 > URL: https://issues.apache.org/jira/browse/KAFKA-9259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Matthias J. Sax >Assignee: Omkar Mestry >Priority: Major > Labels: newbie > > The suppress() operator either inherits serdes from its upstream operator or > falls back to default serdes from the config. > If the upstream operator is an windowed aggregation, the window-aggregation > operator wraps the user passed-in serde with a window-serde and pushed it > into suppress() – however, if default serdes are used, the window-aggregation > operator cannot push anything into suppress(). At runtime, it just creates a > default serde and wraps it according. For this case, suppress() also falls > back to default serdes; however, it does not wrap the serde and thus a > ClassCastException is thrown when the serde is used later. > suppress() is already aware if the upstream aggregation is time/session > windowed or not and thus should use this information to wrap default serdes > accordingly. > The current workaround for windowed-suppress is to overwrite the default > serde upstream to suppress(), such that suppress() inherits serdes and does > not fall back to default serdes. > > For the search engines, when using Avro, this results in a ClassCastException: > {{Caused by: java.lang.ClassCastException: class > org.apache.kafka.streams.kstream.Windowed cannot be cast to class > org.apache.avro.specific.SpecificRecord > (org.apache.kafka.streams.kstream.Windowed and > org.apache.avro.specific.SpecificRecord are in unnamed module of loader > 'app') at > io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-1043) Time-consuming FetchRequest could block other request in the response queue
[ https://issues.apache.org/jira/browse/KAFKA-1043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245298#comment-17245298 ] highluck commented on KAFKA-1043: - [~guozhang] Is this still a problem? > Time-consuming FetchRequest could block other request in the response queue > --- > > Key: KAFKA-1043 > URL: https://issues.apache.org/jira/browse/KAFKA-1043 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 0.8.1 >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > Since in SocketServer the processor who takes any request is also responsible > for writing the response for that request, we make each processor owning its > own response queue. If a FetchRequest takes irregularly long time to write > the channel buffer it would block all other responses in the queue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
[ https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245005#comment-17245005 ] highluck commented on KAFKA-10283: -- [~guozhang] Please review once! > Consolidate client-level and consumer-level assignment within ClientState > - > > Key: KAFKA-10283 > URL: https://issues.apache.org/jira/browse/KAFKA-10283 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > In StreamsPartitionAssignor, we do a two-level assignment, one on the > client-level, and then after the assignment is done we further decide within > the client how to distributed among consumers if there are more. > The {{ClientState}} class is used for book-keeping the assigned tasks, > however it is only used for the first level, while for the second level it is > done outside of the class and we only keep track of the results in a few maps > for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. > some on the client level and some on the consumer level. > We would like to consolidate some of these maps into a single data structure > for better keeping track of the assignment information, and also for less bug > vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
[ https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-10283: Assignee: highluck > Consolidate client-level and consumer-level assignment within ClientState > - > > Key: KAFKA-10283 > URL: https://issues.apache.org/jira/browse/KAFKA-10283 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > In StreamsPartitionAssignor, we do a two-level assignment, one on the > client-level, and then after the assignment is done we further decide within > the client how to distributed among consumers if there are more. > The {{ClientState}} class is used for book-keeping the assigned tasks, > however it is only used for the first level, while for the second level it is > done outside of the class and we only keep track of the results in a few maps > for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. > some on the client level and some on the consumer level. > We would like to consolidate some of these maps into a single data structure > for better keeping track of the assignment information, and also for less bug > vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted
[ https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183075#comment-17183075 ] highluck commented on KAFKA-10362: -- [~ipasynkov] Are you working on it? If not, can I do PR? > When resuming Streams active task with EOS, the checkpoint file should be > deleted > - > > Key: KAFKA-10362 > URL: https://issues.apache.org/jira/browse/KAFKA-10362 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > Today when we suspend a task we commit and along with the commit we always > write checkpoint file even if we are eosEnabled (since the state is already > SUSPENDED). But the suspended task may later be resumed and in that case the > checkpoint file should be deleted since it should only be written when it is > cleanly closed. > With our latest rebalance protocol in KIP-429, resume would not be called > since all suspended tasks would be closed, but with the old eager protocol it > may still be called — I think that may be the reason we did not get it often. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
[ https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168029#comment-17168029 ] highluck commented on KAFKA-10283: -- [~guozhang] Can I pick this issue? Is there any structure you think of? > Consolidate client-level and consumer-level assignment within ClientState > - > > Key: KAFKA-10283 > URL: https://issues.apache.org/jira/browse/KAFKA-10283 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In StreamsPartitionAssignor, we do a two-level assignment, one on the > client-level, and then after the assignment is done we further decide within > the client how to distributed among consumers if there are more. > The {{ClientState}} class is used for book-keeping the assigned tasks, > however it is only used for the first level, while for the second level it is > done outside of the class and we only keep track of the results in a few maps > for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. > some on the client level and some on the consumer level. > We would like to consolidate some of these maps into a single data structure > for better keeping track of the assignment information, and also for less bug > vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159285#comment-17159285 ] highluck commented on KAFKA-5488: - @Ivan Ponomarev thanks for comment! Call me whenever you need help :) > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: Ivan Ponomarev >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17158842#comment-17158842 ] highluck commented on KAFKA-5488: - ok thank you! :) > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: highluck >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17158839#comment-17158839 ] highluck commented on KAFKA-5488: - [~mjsax] thankyou! I didn't have any people assigned to the ticket, so I took it. Can I proceed? > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-5488: --- Assignee: (was: highluck) > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17158505#comment-17158505 ] highluck edited comment on KAFKA-5488 at 7/15/20, 4:20 PM: --- I think there are some unnecessary interfaces {code:java} Branched with(Function, ? extends KStream> chain) {code} I don't think there is a need to take a function as a parameter, That's enough for the consumer {code:java} Map> defaultBranch();{code} I don't think the return type needs to be a Map, Wouldn't it be better to unify the return type to BranchedKStream? [~mjsax] {{What do you think?!}} was (Author: high.lee): I think there are some unnecessary interfaces ``` {{Branched with(Function, ? }}{{extends}} {{KStream> chain)}} {{}} {{```}} I don't think there is a need to take a function as a parameter, That's enough for the consumer {{}} {{```}}{{}} {{ }}{{Map> defaultBranch();}}{{}} ``` I don't think the return type needs to be a Map, Wouldn't it be better to unify the return type to BranchedKStream? {{}} {{[~mjsax] }} {{What do you think?!}} {{}}{{}} {{}} > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: highluck >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17158505#comment-17158505 ] highluck edited comment on KAFKA-5488 at 7/15/20, 4:20 PM: --- I think there are some unnecessary interfaces {code:java} Branched with(Function, ? extends KStream> chain) {code} I don't think there is a need to take a function as a parameter, That's enough for the consumer {code:java} Map> defaultBranch();{code} I don't think the return type needs to be a Map, Wouldn't it be better to unify the return type to BranchedKStream? [~mjsax] {{What do you think?!}} was (Author: high.lee): I think there are some unnecessary interfaces {code:java} Branched with(Function, ? extends KStream> chain) {code} I don't think there is a need to take a function as a parameter, That's enough for the consumer {code:java} Map> defaultBranch();{code} I don't think the return type needs to be a Map, Wouldn't it be better to unify the return type to BranchedKStream? [~mjsax] {{What do you think?!}} > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: highluck >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17158505#comment-17158505 ] highluck commented on KAFKA-5488: - I think there are some unnecessary interfaces ``` {{Branched with(Function, ? }}{{extends}} {{KStream> chain)}} {{}} {{```}} I don't think there is a need to take a function as a parameter, That's enough for the consumer {{}} {{```}}{{}} {{ }}{{Map> defaultBranch();}}{{}} ``` I don't think the return type needs to be a Map, Wouldn't it be better to unify the return type to BranchedKStream? {{}} {{[~mjsax] }} {{What do you think?!}} {{}}{{}} {{}} > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: highluck >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-5488: --- Assignee: highluck > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: highluck >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8666) Improve Documentation on usage of Materialized config object
[ https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-8666: --- Assignee: (was: highluck) > Improve Documentation on usage of Materialized config object > > > Key: KAFKA-8666 > URL: https://issues.apache.org/jira/browse/KAFKA-8666 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie > > When using the Materialized object if the user wants to name the statestore > with > {code:java} > Materialized.as("MyStoreName"){code} > then subsequently provide the key and value serde the calls to do so must > take the form of > {code:java} > Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde) > {code} > If users do the following > {code:java} > Materialized.as("MyStoreName").with(keySerde, valueSerde) > {code} > the Materialized instance created by the "as(storeName)" call is replaced by > a new Materialized instance resulting from the "with(...)" call and any > configuration on the first Materialized instance is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8666) Improve Documentation on usage of Materialized config object
[ https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-8666: --- Assignee: highluck > Improve Documentation on usage of Materialized config object > > > Key: KAFKA-8666 > URL: https://issues.apache.org/jira/browse/KAFKA-8666 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Bill Bejeck >Assignee: highluck >Priority: Major > Labels: newbie > > When using the Materialized object if the user wants to name the statestore > with > {code:java} > Materialized.as("MyStoreName"){code} > then subsequently provide the key and value serde the calls to do so must > take the form of > {code:java} > Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde) > {code} > If users do the following > {code:java} > Materialized.as("MyStoreName").with(keySerde, valueSerde) > {code} > the Materialized instance created by the "as(storeName)" call is replaced by > a new Materialized instance resulting from the "with(...)" call and any > configuration on the first Materialized instance is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-7711: --- Assignee: (was: highluck) > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: kun du >Priority: Minor > Labels: needs-kip > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044126#comment-17044126 ] highluck commented on KAFKA-9455: - [~guozhang] thank you! `JoinWindowStore` is my mistake > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9455: Comment: was deleted (was: @Guozhang Thanks! it was helpful!) > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-7711: --- Assignee: highluck > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: kun du >Assignee: highluck >Priority: Minor > Labels: needs-kip > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9559) Change the default "default serde" from ByteArraySerde to null
[ https://issues.apache.org/jira/browse/KAFKA-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038032#comment-17038032 ] highluck commented on KAFKA-9559: - [~mjsax] oh thank you, for your care! i'm understand! > Change the default "default serde" from ByteArraySerde to null > -- > > Key: KAFKA-9559 > URL: https://issues.apache.org/jira/browse/KAFKA-9559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: highluck >Priority: Blocker > Fix For: 3.0.0 > > > The current default "default serde" is not particularly useful, and in almost > all cases is intended to be overwritten either by setting the config > explicitly or by specifying serdes directly in the topology. If a user does > not set the config and misses specifying a serde they will get a runtime > ClassCastException once they start processing unless they are in fact > processing only bytes. > We should change the default default to null, so that an exception will > instead be thrown immediately on startup if a user failed to specify a serde > somewhere in their topology and it falls back to the unset default. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9559) Change the default "default serde" from ByteArraySerde to null
[ https://issues.apache.org/jira/browse/KAFKA-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9559: --- Assignee: (was: highluck) > Change the default "default serde" from ByteArraySerde to null > -- > > Key: KAFKA-9559 > URL: https://issues.apache.org/jira/browse/KAFKA-9559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > The current default "default serde" is not particularly useful, and in almost > all cases is intended to be overwritten either by setting the config > explicitly or by specifying serdes directly in the topology. If a user does > not set the config and misses specifying a serde they will get a runtime > ClassCastException once they start processing unless they are in fact > processing only bytes. > We should change the default default to null, so that an exception will > instead be thrown immediately on startup if a user failed to specify a serde > somewhere in their topology and it falls back to the unset default. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9559) Change the default "default serde" from ByteArraySerde to null
[ https://issues.apache.org/jira/browse/KAFKA-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9559: --- Assignee: highluck > Change the default "default serde" from ByteArraySerde to null > -- > > Key: KAFKA-9559 > URL: https://issues.apache.org/jira/browse/KAFKA-9559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: highluck >Priority: Blocker > Fix For: 3.0.0 > > > The current default "default serde" is not particularly useful, and in almost > all cases is intended to be overwritten either by setting the config > explicitly or by specifying serdes directly in the topology. If a user does > not set the config and misses specifying a serde they will get a runtime > ClassCastException once they start processing unless they are in fact > processing only bytes. > We should change the default default to null, so that an exception will > instead be thrown immediately on startup if a user failed to specify a serde > somewhere in their topology and it falls back to the unset default. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036321#comment-17036321 ] highluck commented on KAFKA-7499: - [~jbfletch], [~mjsax] Is this ticket in progress? If not, can I proceed? > Extend ProductionExceptionHandler to cover serialization exceptions > --- > > Key: KAFKA-7499 > URL: https://issues.apache.org/jira/browse/KAFKA-7499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: jbfletch >Priority: Major > Labels: beginner, kip, newbie > > In > [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce], > an exception handler for the write path was introduced. This exception > handler covers exception that are raised in the producer callback. > However, serialization happens before the data is handed to the producer with > Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair > types. > Thus, we might want to extend the ProductionExceptionHandler to cover > serialization exception, too, to skip over corrupted output messages. An > example could be a "String" message that contains invalid JSON and should be > serialized as JSON. > KIP-399 (not voted yet; feel free to pick it up): > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036300#comment-17036300 ] highluck commented on KAFKA-9455: - [~guozhang] I don't know if I understand.. I'm trying to replace JoinWindowStore and existing InMemoryWindowStore with TreeMap. What do you think? single-point query -> WindowStore with TreeMap range-query -> JoinWindowStore If it's not what you think, please give me a hint > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9455: Comment: was deleted (was: [~guozhang] I have one more question What do you think about splitting a WindowStore into two stores?) > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035117#comment-17035117 ] highluck commented on KAFKA-9455: - [~guozhang] I have one more question What do you think about splitting a WindowStore into two stores? > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034927#comment-17034927 ] highluck commented on KAFKA-9455: - @Guozhang Thanks! it was helpful! > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034913#comment-17034913 ] highluck commented on KAFKA-9455: - @Guozhang Wang Thanks! you Want to add a new point api? > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034478#comment-17034478 ] highluck commented on KAFKA-9455: - [~guozhang] I have a question. Are you referring to the following form of point queries? "WindowStoreIterator fetch(final Bytes key)" thank you! > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9483) Add Scala KStream#toTable to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-9483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9483: Description: [KIP-523: Add KStream#toTable to the Streams DSL|https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL?src=jira] I am trying to add the same function to scala was: [KIP-523: Add KStream#toTable to the Streams DSL|https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL?src=jira] I am trying to add the same function to scalar > Add Scala KStream#toTable to the Streams DSL > > > Key: KAFKA-9483 > URL: https://issues.apache.org/jira/browse/KAFKA-9483 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: highluck >Assignee: highluck >Priority: Minor > > [KIP-523: Add KStream#toTable to the Streams > DSL|https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL?src=jira] > > I am trying to add the same function to scala -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029848#comment-17029848 ] highluck commented on KAFKA-9455: - [~ableegoldman] Thanks for the explanation! [~guozhang] Do you mind if I try? Thanks! > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9290) Update IQ related JavaDocs
[ https://issues.apache.org/jira/browse/KAFKA-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9290: --- Assignee: highluck > Update IQ related JavaDocs > -- > > Key: KAFKA-9290 > URL: https://issues.apache.org/jira/browse/KAFKA-9290 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: highluck >Priority: Minor > Labels: beginner, newbie > > In Kafka 2.1.0 we deprecated couple of methods (KAFKA-7277) to pass in > timestamps via IQ API via Duration/Instance parameters instead of plain longs. > In Kafka 2.3.0 we introduced TimestampedXxxStores (KAFKA-3522) and allow IQ > to return the stored timestamp. > However, we never update our JavaDocs that contain code snippets to > illustrate how a local store can be queries. For example > `KGroupedStream#count(Materialized)` > ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L116-L122]): > > {code:java} > * {@code * KafkaStreams streams = ... // counting words > * String queryableStoreName = "storeName"; // the store name should be the > name of the store as defined by the Materialized instance > * ReadOnlyKeyValueStore localStore = > streams.store(queryableStoreName, QueryableStoreTypes. Long>keyValueStore()); > * String key = "some-word"; > * Long countForWord = localStore.get(key); // key must be local (application > state is shared over all running Kafka Streams instances) > * } > {code} > We should update all JavaDocs to use `TimestampedXxxStore` and the new > Duration/Instance methods in all those code snippets. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027988#comment-17027988 ] highluck commented on KAFKA-9455: - [~guozhang] Do I need a KIP? > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9403) Migrate BaseConsumer to Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck resolved KAFKA-9403. - Resolution: Not A Bug > Migrate BaseConsumer to Consumer > > > Key: KAFKA-9403 > URL: https://issues.apache.org/jira/browse/KAFKA-9403 > Project: Kafka > Issue Type: Improvement >Reporter: highluck >Assignee: highluck >Priority: Minor > > BaseConsumerRecord is deprecated > but MirrorMaker using BaseConsumerRecord > > Remove BaseConsumer from Mirrormaker -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8382) Add TimestampedSessionStore
[ https://issues.apache.org/jira/browse/KAFKA-8382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027072#comment-17027072 ] highluck edited comment on KAFKA-8382 at 1/30/20 11:22 PM: --- [~mjsax] Can I ask you to review it?! thank you!! [https://github.com/apache/kafka/pull/8022] was (Author: high.lee): [https://github.com/apache/kafka/pull/8022] > Add TimestampedSessionStore > --- > > Key: KAFKA-8382 > URL: https://issues.apache.org/jira/browse/KAFKA-8382 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: highluck >Priority: Minor > Labels: kip > > Follow up to KIP-258, to complete the KIP by adding TimestampedSessionStores. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8382) Add TimestampedSessionStore
[ https://issues.apache.org/jira/browse/KAFKA-8382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027072#comment-17027072 ] highluck commented on KAFKA-8382: - [https://github.com/apache/kafka/pull/8022] > Add TimestampedSessionStore > --- > > Key: KAFKA-8382 > URL: https://issues.apache.org/jira/browse/KAFKA-8382 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: highluck >Priority: Minor > Labels: kip > > Follow up to KIP-258, to complete the KIP by adding TimestampedSessionStores. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-8382) Add TimestampedSessionStore
[ https://issues.apache.org/jira/browse/KAFKA-8382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-8382: Comment: was deleted (was: [https://github.com/apache/kafka/pull/8022]) > Add TimestampedSessionStore > --- > > Key: KAFKA-8382 > URL: https://issues.apache.org/jira/browse/KAFKA-8382 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: highluck >Priority: Minor > Labels: kip > > Follow up to KIP-258, to complete the KIP by adding TimestampedSessionStores. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8382) Add TimestampedSessionStore
[ https://issues.apache.org/jira/browse/KAFKA-8382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027069#comment-17027069 ] highluck commented on KAFKA-8382: - [https://github.com/apache/kafka/pull/8022] > Add TimestampedSessionStore > --- > > Key: KAFKA-8382 > URL: https://issues.apache.org/jira/browse/KAFKA-8382 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: highluck >Priority: Minor > Labels: kip > > Follow up to KIP-258, to complete the KIP by adding TimestampedSessionStores. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9483) Add Scala KStream#toTable to the Streams DSL
highluck created KAFKA-9483: --- Summary: Add Scala KStream#toTable to the Streams DSL Key: KAFKA-9483 URL: https://issues.apache.org/jira/browse/KAFKA-9483 Project: Kafka Issue Type: Improvement Reporter: highluck Assignee: highluck [KIP-523: Add KStream#toTable to the Streams DSL|https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL?src=jira] I am trying to add the same function to scalar -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8147) Add changelog topic configuration to KTable suppress
[ https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-8147: --- Assignee: highluck > Add changelog topic configuration to KTable suppress > > > Key: KAFKA-8147 > URL: https://issues.apache.org/jira/browse/KAFKA-8147 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.1 >Reporter: Maarten >Assignee: highluck >Priority: Minor > Labels: kip > > The streams DSL does not provide a way to configure the changelog topic > created by KTable.suppress. > From the perspective of an external user this could be implemented similar to > the configuration of aggregate + materialized, i.e., > {code:java} > changelogTopicConfigs = // Configs > materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs) > .. > KGroupedStream.aggregate(..,materialized) > {code} > [KIP-446: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress|https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8382) Add TimestampedSessionStore
[ https://issues.apache.org/jira/browse/KAFKA-8382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-8382: --- Assignee: highluck > Add TimestampedSessionStore > --- > > Key: KAFKA-8382 > URL: https://issues.apache.org/jira/browse/KAFKA-8382 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: highluck >Priority: Minor > Labels: kip > > Follow up to KIP-258, to complete the KIP by adding TimestampedSessionStores. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9290) Update IQ related JavaDocs
[ https://issues.apache.org/jira/browse/KAFKA-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9290: --- Assignee: (was: highluck) > Update IQ related JavaDocs > -- > > Key: KAFKA-9290 > URL: https://issues.apache.org/jira/browse/KAFKA-9290 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, newbie > > In Kafka 2.1.0 we deprecated couple of methods (KAFKA-7277) to pass in > timestamps via IQ API via Duration/Instance parameters instead of plain longs. > In Kafka 2.3.0 we introduced TimestampedXxxStores (KAFKA-3522) and allow IQ > to return the stored timestamp. > However, we never update our JavaDocs that contain code snippets to > illustrate how a local store can be queries. For example > `KGroupedStream#count(Materialized)` > ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L116-L122]): > > {code:java} > * {@code * KafkaStreams streams = ... // counting words > * String queryableStoreName = "storeName"; // the store name should be the > name of the store as defined by the Materialized instance > * ReadOnlyKeyValueStore localStore = > streams.store(queryableStoreName, QueryableStoreTypes. Long>keyValueStore()); > * String key = "some-word"; > * Long countForWord = localStore.get(key); // key must be local (application > state is shared over all running Kafka Streams instances) > * } > {code} > We should update all JavaDocs to use `TimestampedXxxStore` and the new > Duration/Instance methods in all those code snippets. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9290) Update IQ related JavaDocs
[ https://issues.apache.org/jira/browse/KAFKA-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9290: --- Assignee: highluck > Update IQ related JavaDocs > -- > > Key: KAFKA-9290 > URL: https://issues.apache.org/jira/browse/KAFKA-9290 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: highluck >Priority: Minor > Labels: beginner, newbie > > In Kafka 2.1.0 we deprecated couple of methods (KAFKA-7277) to pass in > timestamps via IQ API via Duration/Instance parameters instead of plain longs. > In Kafka 2.3.0 we introduced TimestampedXxxStores (KAFKA-3522) and allow IQ > to return the stored timestamp. > However, we never update our JavaDocs that contain code snippets to > illustrate how a local store can be queries. For example > `KGroupedStream#count(Materialized)` > ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L116-L122]): > > {code:java} > * {@code * KafkaStreams streams = ... // counting words > * String queryableStoreName = "storeName"; // the store name should be the > name of the store as defined by the Materialized instance > * ReadOnlyKeyValueStore localStore = > streams.store(queryableStoreName, QueryableStoreTypes. Long>keyValueStore()); > * String key = "some-word"; > * Long countForWord = localStore.get(key); // key must be local (application > state is shared over all running Kafka Streams instances) > * } > {code} > We should update all JavaDocs to use `TimestampedXxxStore` and the new > Duration/Instance methods in all those code snippets. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7658) Add KStream#toTable to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-7658: --- Assignee: highluck (was: Aishwarya Pradeep Kumar) > Add KStream#toTable to the Streams DSL > -- > > Key: KAFKA-7658 > URL: https://issues.apache.org/jira/browse/KAFKA-7658 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: kip, newbie > > KIP-523: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL] > > We'd like to add a new API to the KStream object of the Streams DSL: > {code:java} > KTable KStream.toTable() > KTable KStream.toTable(Materialized) > {code} > The function re-interpret the event stream {{KStream}} as a changelog stream > {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy > {{KStream.reduce()}} function which always take the new value, as it has the > following difference: > 1) an aggregation operator of {{KStream}} is for aggregating a event stream > into an evolving table, which will drop null-values from the input event > stream; whereas a {{toTable}} function will completely change the semantics > of the input stream from event stream to changelog stream, and null-values > will still be serialized, and if the resulted bytes are also null they will > be interpreted as "deletes" to the materialized KTable (i.e. tombstones in > the changelog stream). > 2) the aggregation result {{KTable}} will always be materialized, whereas > {{toTable}} resulted KTable may only be materialized if the overloaded > function with Materialized is used (and if optimization is turned on it may > still be only logically materialized if the queryable name is not set). > Therefore, for users who want to take a event stream into a changelog stream > (no matter why they cannot read from the source topic as a changelog stream > {{KTable}} at the beginning), they should be using this new API instead of > the dummy reduction function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7658) Add KStream#toTable to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018432#comment-17018432 ] highluck commented on KAFKA-7658: - I wanted to say Is it okay if I pr?! > Add KStream#toTable to the Streams DSL > -- > > Key: KAFKA-7658 > URL: https://issues.apache.org/jira/browse/KAFKA-7658 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Aishwarya Pradeep Kumar >Priority: Major > Labels: kip, newbie > > KIP-523: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL] > > We'd like to add a new API to the KStream object of the Streams DSL: > {code:java} > KTable KStream.toTable() > KTable KStream.toTable(Materialized) > {code} > The function re-interpret the event stream {{KStream}} as a changelog stream > {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy > {{KStream.reduce()}} function which always take the new value, as it has the > following difference: > 1) an aggregation operator of {{KStream}} is for aggregating a event stream > into an evolving table, which will drop null-values from the input event > stream; whereas a {{toTable}} function will completely change the semantics > of the input stream from event stream to changelog stream, and null-values > will still be serialized, and if the resulted bytes are also null they will > be interpreted as "deletes" to the materialized KTable (i.e. tombstones in > the changelog stream). > 2) the aggregation result {{KTable}} will always be materialized, whereas > {{toTable}} resulted KTable may only be materialized if the overloaded > function with Materialized is used (and if optimization is turned on it may > still be only logically materialized if the queryable name is not set). > Therefore, for users who want to take a event stream into a changelog stream > (no matter why they cannot read from the source topic as a changelog stream > {{KTable}} at the beginning), they should be using this new API instead of > the dummy reduction function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017571#comment-17017571 ] highluck commented on KAFKA-9042: - [~mjsax] Thank you for your reply Sorry. It was a bit exciting to think that I could contribute to a very attractive project. So i was a little excited I will be more careful Thank you! > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9440) Add ConsumerGroupCommand to delete static members
[ https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9440: Comment: was deleted (was: [~bchen225242] Can I take this ticket with me?) > Add ConsumerGroupCommand to delete static members > - > > Key: KAFKA-9440 > URL: https://issues.apache.org/jira/browse/KAFKA-9440 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted, kip, newbie, newbie++ > > We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It > would be good to instantiate the API as part of the ConsumerGroupCommand for > easy command line usage. > This change requires a new KIP, and just posting out here in case anyone who > uses static membership to pick it up, if they would like to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9440) Add ConsumerGroupCommand to delete static members
[ https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017080#comment-17017080 ] highluck commented on KAFKA-9440: - [~bchen225242] Can I take this ticket with me? > Add ConsumerGroupCommand to delete static members > - > > Key: KAFKA-9440 > URL: https://issues.apache.org/jira/browse/KAFKA-9440 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted, kip, newbie, newbie++ > > We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It > would be good to instantiate the API as part of the ConsumerGroupCommand for > easy command line usage. > This change requires a new KIP, and just posting out here in case anyone who > uses static membership to pick it up, if they would like to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017077#comment-17017077 ] highluck edited comment on KAFKA-9042 at 1/16/20 4:18 PM: -- [~mjsax] [~bchen225242] I create a KIP, I modified the code by adding TestCode. Can I ask you to review again? [GitHub Pull Request #7948|https://github.com/apache/kafka/pull/7948] [KIP-560: Auto infer external topic partitions in stream reset tool|https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool?src=jira] was (Author: high.lee): [~mjsax] [~bchen225242] I create a KIP, I modified the code by adding TestCode. Can I ask you to review again? [KIP-560: Auto infer external topic partitions in stream reset tool|https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool?src=jira] > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017077#comment-17017077 ] highluck commented on KAFKA-9042: - [~mjsax] [~bchen225242] I create a KIP, I modified the code by adding TestCode. Can I ask you to review again? [KIP-560: Auto infer external topic partitions in stream reset tool|https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool?src=jira] > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015511#comment-17015511 ] highluck edited comment on KAFKA-9042 at 1/15/20 4:15 AM: -- [~mjsax] Thank you !! Can you give me WIKI write permission? my id : high.lee :) thank you! was (Author: high.lee): [~mjsax] Thank you !! Can you give me write permission? my id is high.lee :) thank you! > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015511#comment-17015511 ] highluck commented on KAFKA-9042: - [~mjsax] Thank you !! Can you give me write permission? my id is high.lee :) thank you! > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015214#comment-17015214 ] highluck commented on KAFKA-9042: - [~bchen225242] [~mjsax] May I ask for a review? > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9042: Comment: was deleted (was: [~bchen225242] I don't know if I understand well, but please review it!) > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9403) Migrate BaseConsumer to Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9403: Summary: Migrate BaseConsumer to Consumer (was: Remove BaseConsumer from Mirrormaker) > Migrate BaseConsumer to Consumer > > > Key: KAFKA-9403 > URL: https://issues.apache.org/jira/browse/KAFKA-9403 > Project: Kafka > Issue Type: Improvement >Reporter: highluck >Assignee: highluck >Priority: Minor > > BaseConsumerRecord is deprecated > but MirrorMaker using BaseConsumerRecord > > Remove BaseConsumer from Mirrormaker -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014457#comment-17014457 ] highluck commented on KAFKA-9042: - [~bchen225242] I don't know if I understand well, but please review it! > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9042: Comment: was deleted (was: [~bchen225242] Is it okay if I take this ticket?) > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9042: --- Assignee: highluck > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Assignee: highluck >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014404#comment-17014404 ] highluck commented on KAFKA-9042: - [~bchen225242] Is it okay if I take this ticket? > Auto infer external topic partitions in stream reset tool > - > > Key: KAFKA-9042 > URL: https://issues.apache.org/jira/browse/KAFKA-9042 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Priority: Major > Labels: needs-kip, newbie, newbie++ > > As of today, user has to specify `--input-topic` in the stream reset to be > able to reset offset to a specific position. For a stream job with multiple > external topics that needs to be purged, users usually don't want to name all > the topics in order to reset the offsets. It's really painful to look through > the entire topology to make sure we purge all the committed offsets. > We could add a config `--reset-all-external-topics` to the reset tool such > that when enabled, we could delete offsets for all involved topics. The topic > metadata could be acquired by issuing a `DescribeGroup` request from admin > client, which is stored in the member subscription information. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9152) Improve Sensor Retrieval
[ https://issues.apache.org/jira/browse/KAFKA-9152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014380#comment-17014380 ] highluck commented on KAFKA-9152: - [~cadonna] [https://github.com/apache/kafka/pull/7928] i fixed it! Can you review again?! thank you :) > Improve Sensor Retrieval > - > > Key: KAFKA-9152 > URL: https://issues.apache.org/jira/browse/KAFKA-9152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: highluck >Priority: Minor > Labels: newbie, tech-debt > > This ticket shall improve two aspects of the retrieval of sensors: > 1. Currently, when a sensor is retrieved with {{*Metrics.*Sensor()}} (e.g. > {{ThreadMetrics.createTaskSensor()}}) after it was created with the same > method {{*Metrics.*Sensor()}}, the sensor is added again to the corresponding > queue in {{*Sensors}} (e.g. {{threadLevelSensors}}) in > {{StreamsMetricsImpl}}. Those queues are used to remove the sensors when > {{removeAll*LevelSensors()}} is called. Having multiple times the same > sensors in this queue is not an issue from a correctness point of view. > However, it would reduce the footprint to only store a sensor once in those > queues. > 2. When a sensor is retrieved, the current code attempts to create a new > sensor and to add to it again the corresponding metrics. This could be > avoided. > > Both aspects could be improved by checking whether a sensor already exists by > calling {{getSensor()}} on the {{Metrics}} object and checking the return > value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9403) Remove BaseConsumer from Mirrormaker
[ https://issues.apache.org/jira/browse/KAFKA-9403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014373#comment-17014373 ] highluck edited comment on KAFKA-9403 at 1/13/20 2:36 PM: -- [~ijuma] thank you :) [https://github.com/apache/kafka/pull/7935] First of all I removed all the available parts was (Author: high.lee): [~ijuma] [https://github.com/apache/kafka/pull/7935] First of all I removed all the available parts > Remove BaseConsumer from Mirrormaker > > > Key: KAFKA-9403 > URL: https://issues.apache.org/jira/browse/KAFKA-9403 > Project: Kafka > Issue Type: Improvement >Reporter: highluck >Assignee: highluck >Priority: Minor > > BaseConsumerRecord is deprecated > but MirrorMaker using BaseConsumerRecord > > Remove BaseConsumer from Mirrormaker -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9403) Remove BaseConsumer from Mirrormaker
[ https://issues.apache.org/jira/browse/KAFKA-9403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014373#comment-17014373 ] highluck commented on KAFKA-9403: - [~ijuma] [https://github.com/apache/kafka/pull/7935] First of all I removed all the available parts > Remove BaseConsumer from Mirrormaker > > > Key: KAFKA-9403 > URL: https://issues.apache.org/jira/browse/KAFKA-9403 > Project: Kafka > Issue Type: Improvement >Reporter: highluck >Assignee: highluck >Priority: Minor > > BaseConsumerRecord is deprecated > but MirrorMaker using BaseConsumerRecord > > Remove BaseConsumer from Mirrormaker -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9403) Remove BaseConsumer from Mirrormaker
[ https://issues.apache.org/jira/browse/KAFKA-9403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014366#comment-17014366 ] highluck commented on KAFKA-9403: - [~ijuma] Can I ask for a review? > Remove BaseConsumer from Mirrormaker > > > Key: KAFKA-9403 > URL: https://issues.apache.org/jira/browse/KAFKA-9403 > Project: Kafka > Issue Type: Improvement >Reporter: highluck >Assignee: highluck >Priority: Minor > > BaseConsumerRecord is deprecated > but MirrorMaker using BaseConsumerRecord > > Remove BaseConsumer from Mirrormaker -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9129) Add Thread ID to the InternalProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9129: --- Assignee: (was: highluck) > Add Thread ID to the InternalProcessorContext > - > > Key: KAFKA-9129 > URL: https://issues.apache.org/jira/browse/KAFKA-9129 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Major > > When we added client metrics we had to move the {{StreamsMetricsImpl}} object > to the client level. That means that now instead of having one > {{StreamsMetricsImpl}} object per thread, we have now one per client. That > also means that we cannot store the thread ID in the {{StreamsMetricsImpl}} > anymore. Currently, we get the thread ID from > {{Thread.currentThread().getName()}} when we need to create a sensor. > However, that is not robust against code refactoring because we need to > ensure that the thread that creates the sensor is also the one that records > the metrics. To be more flexible, we should expose the ID of the thread that > executes a processor in the {{InternalProcessorContext}} like it already > exposes the task ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9403) Remove BaseConsumer from Mirrormaker
highluck created KAFKA-9403: --- Summary: Remove BaseConsumer from Mirrormaker Key: KAFKA-9403 URL: https://issues.apache.org/jira/browse/KAFKA-9403 Project: Kafka Issue Type: Improvement Reporter: highluck Assignee: highluck BaseConsumerRecord is deprecated but MirrorMaker using BaseConsumerRecord Remove BaseConsumer from Mirrormaker -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9402) PartitionStates update Improvement Opinion
[ https://issues.apache.org/jira/browse/KAFKA-9402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck resolved KAFKA-9402. - Resolution: Not A Problem > PartitionStates update Improvement Opinion > -- > > Key: KAFKA-9402 > URL: https://issues.apache.org/jira/browse/KAFKA-9402 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.4.0 >Reporter: highluck >Assignee: highluck >Priority: Minor > Labels: client > > PartitionStates update Improvement Opinion > > PartitionStates#update > > {code:java} > // > LinkedHashMap> topicToPartitions = new > LinkedHashMap<>(); > for (TopicPartition tp : partitionToState.keySet()) { > List partitions = > topicToPartitions.computeIfAbsent(tp.topic(), k -> new ArrayList<>()); > partitions.add(tp); > } > for (Map.Entry> entry : > topicToPartitions.entrySet()) { > for (TopicPartition tp : entry.getValue()) { > S state = partitionToState.get(tp); > map.put(tp, state); > } > } > {code} > I think it's complicated by order. > > It is a part that can be improved more simply > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9402) PartitionStates update Improvement Opinion
highluck created KAFKA-9402: --- Summary: PartitionStates update Improvement Opinion Key: KAFKA-9402 URL: https://issues.apache.org/jira/browse/KAFKA-9402 Project: Kafka Issue Type: Improvement Affects Versions: 2.4.0 Reporter: highluck Assignee: highluck PartitionStates update Improvement Opinion PartitionStates#update {code:java} // LinkedHashMap> topicToPartitions = new LinkedHashMap<>(); for (TopicPartition tp : partitionToState.keySet()) { List partitions = topicToPartitions.computeIfAbsent(tp.topic(), k -> new ArrayList<>()); partitions.add(tp); } for (Map.Entry> entry : topicToPartitions.entrySet()) { for (TopicPartition tp : entry.getValue()) { S state = partitionToState.get(tp); map.put(tp, state); } } {code} I think it's complicated by order. It is a part that can be improved more simply -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9090) LeaveGroup admin request should enforce minimum version
[ https://issues.apache.org/jira/browse/KAFKA-9090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17013659#comment-17013659 ] highluck commented on KAFKA-9090: - [~bchen225242] Is this ticket valid? > LeaveGroup admin request should enforce minimum version > --- > > Key: KAFKA-9090 > URL: https://issues.apache.org/jira/browse/KAFKA-9090 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > The current removeFromGroup command in AdminClient will only take effect when > broker recognizes `group.instance.id`. We should enforce the request builder > version and fail the request with exception when broker is on older version. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3416) Update Fetcher to return Future for Metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-3416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17013649#comment-17013649 ] highluck commented on KAFKA-3416: - [~john.warner] Is this ticket valid? If so, can I do it? > Update Fetcher to return Future for Metadata requests > - > > Key: KAFKA-3416 > URL: https://issues.apache.org/jira/browse/KAFKA-3416 > Project: Kafka > Issue Type: New Feature >Reporter: John Warner >Priority: Critical > > Update Fetcher so that requests to get metadata return the Future, rather > than waiting for the Future to finish. This enables non-blocking code to use > these requests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-3416) Update Fetcher to return Future for Metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-3416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-3416: --- Assignee: (was: highluck) > Update Fetcher to return Future for Metadata requests > - > > Key: KAFKA-3416 > URL: https://issues.apache.org/jira/browse/KAFKA-3416 > Project: Kafka > Issue Type: New Feature >Reporter: John Warner >Priority: Critical > > Update Fetcher so that requests to get metadata return the Future, rather > than waiting for the Future to finish. This enables non-blocking code to use > these requests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-3416) Update Fetcher to return Future for Metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-3416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-3416: Comment: was deleted (was: [~john.warner] Can you assign me a ticket? try it) > Update Fetcher to return Future for Metadata requests > - > > Key: KAFKA-3416 > URL: https://issues.apache.org/jira/browse/KAFKA-3416 > Project: Kafka > Issue Type: New Feature >Reporter: John Warner >Priority: Critical > > Update Fetcher so that requests to get metadata return the Future, rather > than waiting for the Future to finish. This enables non-blocking code to use > these requests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-3416) Update Fetcher to return Future for Metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-3416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-3416: --- Assignee: highluck > Update Fetcher to return Future for Metadata requests > - > > Key: KAFKA-3416 > URL: https://issues.apache.org/jira/browse/KAFKA-3416 > Project: Kafka > Issue Type: New Feature >Reporter: John Warner >Assignee: highluck >Priority: Critical > > Update Fetcher so that requests to get metadata return the Future, rather > than waiting for the Future to finish. This enables non-blocking code to use > these requests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-2758) Improve Offset Commit Behavior
[ https://issues.apache.org/jira/browse/KAFKA-2758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012672#comment-17012672 ] highluck commented on KAFKA-2758: - [~guozhang] Is this issue still open > Improve Offset Commit Behavior > -- > > Key: KAFKA-2758 > URL: https://issues.apache.org/jira/browse/KAFKA-2758 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, reliability > > There are two scenarios of offset committing that we can improve: > 1) we can filter the partitions whose committed offset is equal to the > consumed offset, meaning there is no new consumed messages from this > partition and hence we do not need to include this partition in the commit > request. > 2) we can make a commit request right after resetting to a fetch / consume > position either according to the reset policy (e.g. on consumer starting up, > or handling of out of range offset, etc), or through the {code} seek {code} > so that if the consumer fails right after these event, upon recovery it can > restarts from the reset position instead of resetting again: this can lead > to, for example, data loss if we use "largest" as reset policy while there > are new messages coming to the fetching partitions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-2758) Improve Offset Commit Behavior
[ https://issues.apache.org/jira/browse/KAFKA-2758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012672#comment-17012672 ] highluck edited comment on KAFKA-2758 at 1/10/20 10:16 AM: --- [~guozhang] Is this issue still open? was (Author: high.lee): [~guozhang] Is this issue still open > Improve Offset Commit Behavior > -- > > Key: KAFKA-2758 > URL: https://issues.apache.org/jira/browse/KAFKA-2758 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, reliability > > There are two scenarios of offset committing that we can improve: > 1) we can filter the partitions whose committed offset is equal to the > consumed offset, meaning there is no new consumed messages from this > partition and hence we do not need to include this partition in the commit > request. > 2) we can make a commit request right after resetting to a fetch / consume > position either according to the reset policy (e.g. on consumer starting up, > or handling of out of range offset, etc), or through the {code} seek {code} > so that if the consumer fails right after these event, upon recovery it can > restarts from the reset position instead of resetting again: this can lead > to, for example, data loss if we use "largest" as reset policy while there > are new messages coming to the fetching partitions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9152) Improve Sensor Retrieval
[ https://issues.apache.org/jira/browse/KAFKA-9152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012572#comment-17012572 ] highluck commented on KAFKA-9152: - [~cadonna] [https://github.com/apache/kafka/pull/7914] I don't know if I understand Please confirm pull > Improve Sensor Retrieval > - > > Key: KAFKA-9152 > URL: https://issues.apache.org/jira/browse/KAFKA-9152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: highluck >Priority: Minor > Labels: newbie, tech-debt > > This ticket shall improve two aspects of the retrieval of sensors: > 1. Currently, when a sensor is retrieved with {{*Metrics.*Sensor()}} (e.g. > {{ThreadMetrics.createTaskSensor()}}) after it was created with the same > method {{*Metrics.*Sensor()}}, the sensor is added again to the corresponding > queue in {{*Sensors}} (e.g. {{threadLevelSensors}}) in > {{StreamsMetricsImpl}}. Those queues are used to remove the sensors when > {{removeAll*LevelSensors()}} is called. Having multiple times the same > sensors in this queue is not an issue from a correctness point of view. > However, it would reduce the footprint to only store a sensor once in those > queues. > 2. When a sensor is retrieved, the current code attempts to create a new > sensor and to add to it again the corresponding metrics. This could be > avoided. > > Both aspects could be improved by checking whether a sensor already exists by > calling {{getSensor()}} on the {{Metrics}} object and checking the return > value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9152) Improve Sensor Retrieval
[ https://issues.apache.org/jira/browse/KAFKA-9152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012572#comment-17012572 ] highluck edited comment on KAFKA-9152 at 1/10/20 8:40 AM: -- [~cadonna] [https://github.com/apache/kafka/pull/7914] I don't know if I understand Please confirm pull was (Author: high.lee): [~cadonna] [https://github.com/apache/kafka/pull/7914] I don't know if I understand Please confirm pull > Improve Sensor Retrieval > - > > Key: KAFKA-9152 > URL: https://issues.apache.org/jira/browse/KAFKA-9152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: highluck >Priority: Minor > Labels: newbie, tech-debt > > This ticket shall improve two aspects of the retrieval of sensors: > 1. Currently, when a sensor is retrieved with {{*Metrics.*Sensor()}} (e.g. > {{ThreadMetrics.createTaskSensor()}}) after it was created with the same > method {{*Metrics.*Sensor()}}, the sensor is added again to the corresponding > queue in {{*Sensors}} (e.g. {{threadLevelSensors}}) in > {{StreamsMetricsImpl}}. Those queues are used to remove the sensors when > {{removeAll*LevelSensors()}} is called. Having multiple times the same > sensors in this queue is not an issue from a correctness point of view. > However, it would reduce the footprint to only store a sensor once in those > queues. > 2. When a sensor is retrieved, the current code attempts to create a new > sensor and to add to it again the corresponding metrics. This could be > avoided. > > Both aspects could be improved by checking whether a sensor already exists by > calling {{getSensor()}} on the {{Metrics}} object and checking the return > value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9129) Add Thread ID to the InternalProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9129: --- Assignee: highluck > Add Thread ID to the InternalProcessorContext > - > > Key: KAFKA-9129 > URL: https://issues.apache.org/jira/browse/KAFKA-9129 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: highluck >Priority: Major > > When we added client metrics we had to move the {{StreamsMetricsImpl}} object > to the client level. That means that now instead of having one > {{StreamsMetricsImpl}} object per thread, we have now one per client. That > also means that we cannot store the thread ID in the {{StreamsMetricsImpl}} > anymore. Currently, we get the thread ID from > {{Thread.currentThread().getName()}} when we need to create a sensor. > However, that is not robust against code refactoring because we need to > ensure that the thread that creates the sensor is also the one that records > the metrics. To be more flexible, we should expose the ID of the thread that > executes a processor in the {{InternalProcessorContext}} like it already > exposes the task ID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9342) Consider making all Kafka Streams DSL configuration classes immutable
[ https://issues.apache.org/jira/browse/KAFKA-9342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9342: Comment: was deleted (was: [~vvcephei] Can you assign me a ticket? i'm try it) > Consider making all Kafka Streams DSL configuration classes immutable > - > > Key: KAFKA-9342 > URL: https://issues.apache.org/jira/browse/KAFKA-9342 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Levani Kokhreidze >Priority: Major > > Currently, Kafka Streams DSL config classes are mix of *mutable* > _org.apache.kafka.streams.kstream.Consumed, > org.apache.kafka.streams.kstream.Materialized_ and *immutable* > _org.apache.kafka.streams.kstream.Joined, > org.apache.kafka.streams.kstream.Grouped_ classes. > Consider unifying all config classes of the DSL operations and make them > immutable. Backward compatibility should be taken into account when making > config classes immutable. For example, things may break if user has code > similar to this: > > {code:java} > final Materialized materialized = Materialized.as("my-store"); > if (someCondition()) { > materialized.withCachingDisabled(); > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9075) Extend documentation for usage of GlobalKTable vs KTable
[ https://issues.apache.org/jira/browse/KAFKA-9075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-9075: --- Assignee: (was: highluck) > Extend documentation for usage of GlobalKTable vs KTable > > > Key: KAFKA-9075 > URL: https://issues.apache.org/jira/browse/KAFKA-9075 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Boyang Chen >Priority: Minor > Labels: newbie, newbie++ > > We have a KIP which implements global KTable and explains its design > reasoning and comparison with general KTable. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams] > The part missing is on the official documentation to port this information, > and let user make this choice easier. -- This message was sent by Atlassian Jira (v8.3.4#803005)