[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS
[ https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018781#comment-17018781 ] Guozhang Wang commented on KAFKA-9450: -- I think John's idea is to e.g. use a preserved key to store the current offset of the changelog as a preserved key, so that each time we update a value, we will also update this key for offset update. By doing this we would not need a rocksDB#flush when checkpointing, and neither do we need a checkpoint file. One tricky thing however, is that the offset of the written changelog is only updated and known after the batching produce response returns, which may not be immediate after the RocksDB value update. > Decouple inner state flushing from committing with EOS > -- > > Key: KAFKA-9450 > URL: https://issues.apache.org/jira/browse/KAFKA-9450 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > When EOS is turned on, the commit interval is set quite low (100ms) and all > the store layers are flushed during a commit. This is necessary for > forwarding records in the cache to the changelog, but unfortunately also > forces rocksdb to flush the current memtable before it's full. The result is > a large number of small writes to disk, losing the benefits of batching, and > a large number of very small L0 files that are likely to slow compaction. > Since we have to delete the stores to recreate from scratch anyways during an > unclean shutdown with EOS, we may as well skip flushing the innermost > StateStore during a commit and only do so during a graceful shutdown, before > a rebalance, etc. This is currently blocked on a refactoring of the state > store layers to allow decoupling the flush of the caching layer from the > actual state store. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS
[ https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018750#comment-17018750 ] Matthias J. Sax commented on KAFKA-9450: [~vvcephei] Your comment seem to be an orthogonal concern (that I actually don't share – when we call RocksDB#flush(), it seems to be safe to assume that RocksDB persisted the data – why do you doubt that RocksDB does not guarantee this; and if is did not persist it, it would be a RocksDB bug IHMO that should just get fixed.). Nevertheless, this ticket is about decoupling of changelog flushing and local disk flushing – in contrast your comment is about two aspect of local disk flushing, ie, the data itself and the metadata (ie, checkpoint). > Decouple inner state flushing from committing with EOS > -- > > Key: KAFKA-9450 > URL: https://issues.apache.org/jira/browse/KAFKA-9450 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > When EOS is turned on, the commit interval is set quite low (100ms) and all > the store layers are flushed during a commit. This is necessary for > forwarding records in the cache to the changelog, but unfortunately also > forces rocksdb to flush the current memtable before it's full. The result is > a large number of small writes to disk, losing the benefits of batching, and > a large number of very small L0 files that are likely to slow compaction. > Since we have to delete the stores to recreate from scratch anyways during an > unclean shutdown with EOS, we may as well skip flushing the innermost > StateStore during a commit and only do so during a graceful shutdown, before > a rebalance, etc. This is currently blocked on a refactoring of the state > store layers to allow decoupling the flush of the caching layer from the > actual state store. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9453) Make transaction.id optional in group mode EOS
[ https://issues.apache.org/jira/browse/KAFKA-9453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9453: --- Description: After 447, one of the big improvement is that we are no longer requiring single writer scope guarantee, so that user doesn't have to configure a unique transactional.id for transaction safety. In fact, without security concern, we could even avoid using initTransaction API as well. was:After 447, one of the big improvement is that we are no longer requiring single writer scope guarantee, so that user doesn't have to configure a unique transactional.id for transaction safety. > Make transaction.id optional in group mode EOS > -- > > Key: KAFKA-9453 > URL: https://issues.apache.org/jira/browse/KAFKA-9453 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Priority: Major > > After 447, one of the big improvement is that we are no longer requiring > single writer scope guarantee, so that user doesn't have to configure a > unique transactional.id for transaction safety. > In fact, without security concern, we could even avoid using initTransaction > API as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9454) Relax transaction.id security requirement
Boyang Chen created KAFKA-9454: -- Summary: Relax transaction.id security requirement Key: KAFKA-9454 URL: https://issues.apache.org/jira/browse/KAFKA-9454 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen As we are no longer required to configure transactional.id on client, we could piggy-back the security check with consumer group id instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9453) Make transaction.id optional in group mode EOS
Boyang Chen created KAFKA-9453: -- Summary: Make transaction.id optional in group mode EOS Key: KAFKA-9453 URL: https://issues.apache.org/jira/browse/KAFKA-9453 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen After 447, one of the big improvement is that we are no longer requiring single writer scope guarantee, so that user doesn't have to configure a unique transactional.id for transaction safety. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9346) Consumer fetch offset back-off with pending transactions
[ https://issues.apache.org/jira/browse/KAFKA-9346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018730#comment-17018730 ] Travis Bischel commented on KAFKA-9346: --- Part of KIP-447, in 2.5.0. > Consumer fetch offset back-off with pending transactions > > > Key: KAFKA-9346 > URL: https://issues.apache.org/jira/browse/KAFKA-9346 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9365) Add consumer group information to txn commit
[ https://issues.apache.org/jira/browse/KAFKA-9365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018729#comment-17018729 ] Travis Bischel commented on KAFKA-9365: --- Part of KIP-447, in 2.5.0. > Add consumer group information to txn commit > - > > Key: KAFKA-9365 > URL: https://issues.apache.org/jira/browse/KAFKA-9365 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > This effort adds consumer group information to the txn commit protocol on the > broker side. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys
[ https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018670#comment-17018670 ] ASF GitHub Bot commented on KAFKA-6144: --- vinothchandar commented on pull request #7868: KAFKA-6144: Allow state stores to serve stale reads during rebalance URL: https://github.com/apache/kafka/pull/7868 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow serving interactive queries from in-sync Standbys > --- > > Key: KAFKA-6144 > URL: https://issues.apache.org/jira/browse/KAFKA-6144 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Antony Stubbs >Assignee: Navinder Brar >Priority: Major > Labels: kip-535 > Fix For: 2.5.0 > > Attachments: image-2019-10-09-20-33-37-423.png, > image-2019-10-09-20-47-38-096.png > > > Currently when expanding the KS cluster, the new node's partitions will be > unavailable during the rebalance, which for large states can take a very long > time, or for small state stores even more than a few ms can be a deal-breaker > for micro service use cases. > One workaround is to allow stale data to be read from the state stores when > use case allows. Adding the use case from KAFKA-8994 as it is more > descriptive. > "Consider the following scenario in a three node Streams cluster with node A, > node S and node R, executing a stateful sub-topology/topic group with 1 > partition and `_num.standby.replicas=1_` > * *t0*: A is the active instance owning the partition, B is the standby that > keeps replicating the A's state into its local disk, R just routes streams > IQs to active instance using StreamsMetadata > * *t1*: IQs pick node R as router, R forwards query to A, A responds back to > R which reverse forwards back the results. > * *t2:* Active A instance is killed and rebalance begins. IQs start failing > to A > * *t3*: Rebalance assignment happens and standby B is now promoted as active > instance. IQs continue to fail > * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last > commit position, IQs continue to fail > * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs > start succeeding again > > Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can > take few seconds (~10 seconds based on defaults values). Depending on how > laggy the standby B was prior to A being killed, t4 can take few > seconds-minutes. > While this behavior favors consistency over availability at all times, the > long unavailability window might be undesirable for certain classes of > applications (e.g simple caches or dashboards). > This issue aims to also expose information about standby B to R, during each > rebalance such that the queries can be routed by an application to a standby > to serve stale reads, choosing availability over consistency." -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7658) Add KStream#toTable to the Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-7658: --- Assignee: highluck (was: Aishwarya Pradeep Kumar) > Add KStream#toTable to the Streams DSL > -- > > Key: KAFKA-7658 > URL: https://issues.apache.org/jira/browse/KAFKA-7658 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: kip, newbie > > KIP-523: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL] > > We'd like to add a new API to the KStream object of the Streams DSL: > {code:java} > KTable KStream.toTable() > KTable KStream.toTable(Materialized) > {code} > The function re-interpret the event stream {{KStream}} as a changelog stream > {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy > {{KStream.reduce()}} function which always take the new value, as it has the > following difference: > 1) an aggregation operator of {{KStream}} is for aggregating a event stream > into an evolving table, which will drop null-values from the input event > stream; whereas a {{toTable}} function will completely change the semantics > of the input stream from event stream to changelog stream, and null-values > will still be serialized, and if the resulted bytes are also null they will > be interpreted as "deletes" to the materialized KTable (i.e. tombstones in > the changelog stream). > 2) the aggregation result {{KTable}} will always be materialized, whereas > {{toTable}} resulted KTable may only be materialized if the overloaded > function with Materialized is used (and if optimization is turned on it may > still be only logically materialized if the queryable name is not set). > Therefore, for users who want to take a event stream into a changelog stream > (no matter why they cannot read from the source topic as a changelog stream > {{KTable}} at the beginning), they should be using this new API instead of > the dummy reduction function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS
[ https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018602#comment-17018602 ] Ted Yu commented on KAFKA-9450: --- w.r.t. separate column family, since the data in this family tends to be small compared to the data family, wouldn't we end up with small files similar to rocksdb memtable flush ? > Decouple inner state flushing from committing with EOS > -- > > Key: KAFKA-9450 > URL: https://issues.apache.org/jira/browse/KAFKA-9450 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > When EOS is turned on, the commit interval is set quite low (100ms) and all > the store layers are flushed during a commit. This is necessary for > forwarding records in the cache to the changelog, but unfortunately also > forces rocksdb to flush the current memtable before it's full. The result is > a large number of small writes to disk, losing the benefits of batching, and > a large number of very small L0 files that are likely to slow compaction. > Since we have to delete the stores to recreate from scratch anyways during an > unclean shutdown with EOS, we may as well skip flushing the innermost > StateStore during a commit and only do so during a graceful shutdown, before > a rebalance, etc. This is currently blocked on a refactoring of the state > store layers to allow decoupling the flush of the caching layer from the > actual state store. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9424) Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer
[ https://issues.apache.org/jira/browse/KAFKA-9424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Lu updated KAFKA-9424: - Reviewer: Rajini Sivaram (was: Manikumar) > Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer > -- > > Key: KAFKA-9424 > URL: https://issues.apache.org/jira/browse/KAFKA-9424 > Project: Kafka > Issue Type: Improvement > Components: admin, tools >Affects Versions: 0.10.2.0, 2.4.0, 2.3.1 > Environment: Linux,JDK7+ >Reporter: Steven Lu >Priority: Major > Labels: Solved > Original Estimate: 3h > Remaining Estimate: 3h > > In the class Named AclCommand,configure SimpleAclAuthorizer,but no need call > loadCache. > now we have 20,000 topics in kafka cluster,everytime I run AclCommand,all > these topics's Alcs need to be authed, it will be very slow. > The purpose of this optimization is:we can choose to not load the acl of all > topics into memory, mainly for adding and deleting permissions. > PR Available here: [https://github.com/apache/kafka/pull/7706] > mainly for adding and deleting permissions,we can choose to not load the acl > of all topics into memory,then we can add two args "--load-acl-cache" "false" > in AclCommand.main;else you don't add these args, it will load the acl cache > defaultly. > we can choose improve the running time from minutes to less than one second. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9424) Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer
[ https://issues.apache.org/jira/browse/KAFKA-9424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Lu updated KAFKA-9424: - Reviewer: Manikumar (was: Ismael Juma) > Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer > -- > > Key: KAFKA-9424 > URL: https://issues.apache.org/jira/browse/KAFKA-9424 > Project: Kafka > Issue Type: Improvement > Components: admin, tools >Affects Versions: 0.10.2.0, 2.4.0, 2.3.1 > Environment: Linux,JDK7+ >Reporter: Steven Lu >Priority: Major > Labels: Solved > Original Estimate: 3h > Remaining Estimate: 3h > > In the class Named AclCommand,configure SimpleAclAuthorizer,but no need call > loadCache. > now we have 20,000 topics in kafka cluster,everytime I run AclCommand,all > these topics's Alcs need to be authed, it will be very slow. > The purpose of this optimization is:we can choose to not load the acl of all > topics into memory, mainly for adding and deleting permissions. > PR Available here: [https://github.com/apache/kafka/pull/7706] > mainly for adding and deleting permissions,we can choose to not load the acl > of all topics into memory,then we can add two args "--load-acl-cache" "false" > in AclCommand.main;else you don't add these args, it will load the acl cache > defaultly. > we can choose improve the running time from minutes to less than one second. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9452) Add new cached authorizer:change the dim of cache
Steven Lu created KAFKA-9452: Summary: Add new cached authorizer:change the dim of cache Key: KAFKA-9452 URL: https://issues.apache.org/jira/browse/KAFKA-9452 Project: Kafka Issue Type: Improvement Components: security Reporter: Steven Lu Same like issues https://issues.apache.org/jira/browse/KAFKA-5261 , We met the same performance issue which is descripted in the pr [#3756|https://github.com/apache/kafka/pull/3756] in our production environment,hence, we make a revision for the mechamisum of authorization, our revision have such optimizations 1、Build a cache for authorization, which can avoid recomputation of authorization result. The authorization result will fetch on the result catch if the same result has been computed rather than compute it again 2、Differ from the pr 3756, when we build the result cache of the authorization, we take the resource into first consideration. In this way, the authorization is recomputed only when the authorization are change of specific resource. Compared to the the frequency of recomputation can be reduced obviously. -- This message was sent by Atlassian Jira (v8.3.4#803005)