[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2019-02-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16778821#comment-16778821
 ] 

Matthias J. Sax commented on KAFKA-6970:


Created https://issues.apache.org/jira/browse/KAFKA-8006 as follow up because 
it will get a new "fix version" number.

> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Assignee: Nikolay Izhikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2019-01-18 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16746883#comment-16746883
 ] 

Matthias J. Sax commented on KAFKA-6970:


[~NIzhikov] I just realized, that we should do the same thing for global 
stores, within {{GlobalProcessorContextImpl}}. Do you want to do a follow up PR?

> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Assignee: Nikolay Izhikov
>Priority: Major
> Fix For: 2.2.0
>
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716687#comment-16716687
 ] 

ASF GitHub Bot commented on KAFKA-6970:
---

mjsax closed pull request #6016: KAFKA-6970: All standard state stores guarded 
with read only wrapper
URL: https://github.com/apache/kafka/pull/6016
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index ff3ef44894b..99ba0f6ce06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -50,9 +50,18 @@
 private CachedStateStore cachedStateStore(final StateStore store) {
 if (store instanceof CachedStateStore) {
 return (CachedStateStore) store;
-} else if (store instanceof WrappedStateStore
-&& ((WrappedStateStore) store).wrappedStore() instanceof 
CachedStateStore) {
-return (CachedStateStore) ((WrappedStateStore) 
store).wrappedStore();
+} else if (store instanceof WrappedStateStore) {
+StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+
+while (wrapped instanceof WrappedStateStore && !(wrapped 
instanceof CachedStateStore)) {
+wrapped = ((WrappedStateStore) wrapped).wrappedStore();
+}
+
+if (!(wrapped instanceof CachedStateStore)) {
+return null;
+}
+
+return (CachedStateStore) wrapped;
 }
 return null;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c79ec35328a..e7dd4dbc42a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
@@ -37,6 +38,7 @@
 
 import java.time.Duration;
 import java.util.List;
+import 
org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
@@ -102,7 +104,16 @@ public StateStore getStateStore(final String name) {
 "please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.;);
 }
 
-return stateManager.getStore(name);
+final StateStore store = stateManager.getStore(name);
+if (store instanceof KeyValueStore) {
+return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+} else if (store instanceof WindowStore) {
+return new WindowStoreReadWriteDecorator((WindowStore) store);
+} else if (store instanceof SessionStore) {
+return new SessionStoreReadWriteDecorator((SessionStore) store);
+}
+
+return store;
 }
 
 @SuppressWarnings("unchecked")
@@ -196,23 +207,16 @@ public long streamTime() {
 return streamTimeSupplier.get();
 }
 
-private abstract static class StateStoreReadOnlyDecorator implements StateStore {
+private abstract static class StateStoreReadOnlyDecorator extends AbstractStateStore {
 static final String ERROR_MESSAGE = "Global store is read only";
 
-final T underlying;
-
-StateStoreReadOnlyDecorator(final T underlying) {
-this.underlying = underlying;
-}
-
-@Override
-public String name() {
-return underlying.name();
+StateStoreReadOnlyDecorator(final T inner) {
+super(inner);
 }
 
-@Override
-public void init(final ProcessorContext context, final StateStore 
root) {
-underlying.init(context, root);
+@SuppressWarnings("unchecked")
+T getInner() {
+return (T) wrappedStore();
 }
 
 @Override
@@ -221,44 +225,39 @@ public void flush() {
 }
 
 @Override
-public void close() {
-underlying.close();
-}
-
-@Override
-public boolean persistent() {
-return underlying.persistent();
+ 

[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-12-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16713742#comment-16713742
 ] 

ASF GitHub Bot commented on KAFKA-6970:
---

nizhikov opened a new pull request #6016: KAFKA-6970: All standard state stores 
guarded with read only wrapper
URL: https://github.com/apache/kafka/pull/6016
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Assignee: Nikolay Izhikov
>Priority: Major
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-12-05 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710580#comment-16710580
 ] 

Nikolay Izhikov commented on KAFKA-6970:


[~mjsax] Got it. Thanks for an answer.

> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Assignee: Nikolay Izhikov
>Priority: Major
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-12-05 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710492#comment-16710492
 ] 

Matthias J. Sax commented on KAFKA-6970:


It's different. For this ticket, it's about _all_ stores – not just global 
stores.

> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Assignee: Nikolay Izhikov
>Priority: Major
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-12-05 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710463#comment-16710463
 ] 

Nikolay Izhikov commented on KAFKA-6970:


[~mjsax] Seems, all cases from this ticket will be covered in KAFKA-7420.

So, when KAFKA-7420 will be resolved we can close this ticket as duplicate.
Am I miss something?

> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Assignee: Nikolay Izhikov
>Priority: Major
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-11-29 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703896#comment-16703896
 ] 

Nikolay Izhikov commented on KAFKA-6970:


[~mjsax] Thank you.
I will take care of this issue.

> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Assignee: Nikolay Izhikov
>Priority: Major
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6970) Kafka streams lets the user call init() and close() on a state store, when inside Processors

2018-11-29 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703885#comment-16703885
 ] 

Matthias J. Sax commented on KAFKA-6970:


[~NIzhikov]: this might be something for you to tackle :) 

> Kafka streams lets the user call init() and close() on a state store, when 
> inside Processors
> 
>
> Key: KAFKA-6970
> URL: https://issues.apache.org/jira/browse/KAFKA-6970
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: James Cheng
>Priority: Major
>
> When using a state store within Transform (and Processor and 
> TransformValues), the user is able to call init() and close() on the state 
> stores. Those APIs should only be called by kafka streams itself.
> If possible, it would be good to guard those APIs so that the user cannot 
> call them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)