[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16778821#comment-16778821 ] Matthias J. Sax commented on KAFKA-6970: Created https://issues.apache.org/jira/browse/KAFKA-8006 as follow up because it will get a new "fix version" number. > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Assignee: Nikolay Izhikov >Priority: Major > Fix For: 2.2.0 > > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16746883#comment-16746883 ] Matthias J. Sax commented on KAFKA-6970: [~NIzhikov] I just realized, that we should do the same thing for global stores, within {{GlobalProcessorContextImpl}}. Do you want to do a follow up PR? > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Assignee: Nikolay Izhikov >Priority: Major > Fix For: 2.2.0 > > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716687#comment-16716687 ] ASF GitHub Bot commented on KAFKA-6970: --- mjsax closed pull request #6016: KAFKA-6970: All standard state stores guarded with read only wrapper URL: https://github.com/apache/kafka/pull/6016 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index ff3ef44894b..99ba0f6ce06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -50,9 +50,18 @@ private CachedStateStore cachedStateStore(final StateStore store) { if (store instanceof CachedStateStore) { return (CachedStateStore) store; -} else if (store instanceof WrappedStateStore -&& ((WrappedStateStore) store).wrappedStore() instanceof CachedStateStore) { -return (CachedStateStore) ((WrappedStateStore) store).wrappedStore(); +} else if (store instanceof WrappedStateStore) { +StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + +while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) { +wrapped = ((WrappedStateStore) wrapped).wrappedStore(); +} + +if (!(wrapped instanceof CachedStateStore)) { +return null; +} + +return (CachedStateStore) wrapped; } return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index c79ec35328a..e7dd4dbc42a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.ApiUtils; @@ -37,6 +38,7 @@ import java.time.Duration; import java.util.List; +import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; @@ -102,7 +104,16 @@ public StateStore getStateStore(final String name) { "please file a bug report at https://issues.apache.org/jira/projects/KAFKA.;); } -return stateManager.getStore(name); +final StateStore store = stateManager.getStore(name); +if (store instanceof KeyValueStore) { +return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); +} else if (store instanceof WindowStore) { +return new WindowStoreReadWriteDecorator((WindowStore) store); +} else if (store instanceof SessionStore) { +return new SessionStoreReadWriteDecorator((SessionStore) store); +} + +return store; } @SuppressWarnings("unchecked") @@ -196,23 +207,16 @@ public long streamTime() { return streamTimeSupplier.get(); } -private abstract static class StateStoreReadOnlyDecorator implements StateStore { +private abstract static class StateStoreReadOnlyDecorator extends AbstractStateStore { static final String ERROR_MESSAGE = "Global store is read only"; -final T underlying; - -StateStoreReadOnlyDecorator(final T underlying) { -this.underlying = underlying; -} - -@Override -public String name() { -return underlying.name(); +StateStoreReadOnlyDecorator(final T inner) { +super(inner); } -@Override -public void init(final ProcessorContext context, final StateStore root) { -underlying.init(context, root); +@SuppressWarnings("unchecked") +T getInner() { +return (T) wrappedStore(); } @Override @@ -221,44 +225,39 @@ public void flush() { } @Override -public void close() { -underlying.close(); -} - -@Override -public boolean persistent() { -return underlying.persistent(); +
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16713742#comment-16713742 ] ASF GitHub Bot commented on KAFKA-6970: --- nizhikov opened a new pull request #6016: KAFKA-6970: All standard state stores guarded with read only wrapper URL: https://github.com/apache/kafka/pull/6016 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Assignee: Nikolay Izhikov >Priority: Major > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710580#comment-16710580 ] Nikolay Izhikov commented on KAFKA-6970: [~mjsax] Got it. Thanks for an answer. > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Assignee: Nikolay Izhikov >Priority: Major > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710492#comment-16710492 ] Matthias J. Sax commented on KAFKA-6970: It's different. For this ticket, it's about _all_ stores – not just global stores. > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Assignee: Nikolay Izhikov >Priority: Major > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710463#comment-16710463 ] Nikolay Izhikov commented on KAFKA-6970: [~mjsax] Seems, all cases from this ticket will be covered in KAFKA-7420. So, when KAFKA-7420 will be resolved we can close this ticket as duplicate. Am I miss something? > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Assignee: Nikolay Izhikov >Priority: Major > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703896#comment-16703896 ] Nikolay Izhikov commented on KAFKA-6970: [~mjsax] Thank you. I will take care of this issue. > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Assignee: Nikolay Izhikov >Priority: Major > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors
[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703885#comment-16703885 ] Matthias J. Sax commented on KAFKA-6970: [~NIzhikov]: this might be something for you to tackle :) > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: James Cheng >Priority: Major > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)