[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream

2016-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15748190#comment-15748190
 ] 

ASF GitHub Bot commented on FLINK-5289:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2969


> NPE when using value state on non-keyed stream
> --
>
> Key: FLINK-5289
> URL: https://issues.apache.org/jira/browse/FLINK-5289
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Timo Walther
>Assignee: Stefan Richter
>
> Using a {{ValueStateDescriptor}} and 
> {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to 
> {{NullPointerException}} which is not very helpful for users:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15746105#comment-15746105
 ] 

ASF GitHub Bot commented on FLINK-5289:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2969
  
+1 looks good. Will put this into my merge pipeline


> NPE when using value state on non-keyed stream
> --
>
> Key: FLINK-5289
> URL: https://issues.apache.org/jira/browse/FLINK-5289
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Timo Walther
>Assignee: Stefan Richter
>
> Using a {{ValueStateDescriptor}} and 
> {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to 
> {{NullPointerException}} which is not very helpful for users:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735415#comment-15735415
 ] 

ASF GitHub Bot commented on FLINK-5289:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2969
  
Updated w.r.t. comments.


> NPE when using value state on non-keyed stream
> --
>
> Key: FLINK-5289
> URL: https://issues.apache.org/jira/browse/FLINK-5289
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Timo Walther
>Assignee: Stefan Richter
>
> Using a {{ValueStateDescriptor}} and 
> {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to 
> {{NullPointerException}} which is not very helpful for users:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735126#comment-15735126
 ] 

ASF GitHub Bot commented on FLINK-5289:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2969#discussion_r91703632
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 ---
@@ -106,17 +109,45 @@ public boolean hasBroadcastVariable(String name) {
 
@Override
public  ValueState getState(ValueStateDescriptor 
stateProperties) {
-   return operator.getKeyedStateStore().getState(stateProperties);
+   KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
+
+   try {
+   
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+   return keyedStateStore.getState(stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
+   }
}
 
@Override
public  ListState getListState(ListStateDescriptor 
stateProperties) {
-   return 
operator.getKeyedStateStore().getListState(stateProperties);
+   KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
+
+   try {
+   
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+   return keyedStateStore.getListState(stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
+   }
}
 
@Override
public  ReducingState getReducingState(ReducingStateDescriptor 
stateProperties) {
-   return 
operator.getKeyedStateStore().getReducingState(stateProperties);
+   KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
+
+   try {
+   
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+   return 
keyedStateStore.getReducingState(stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
--- End diff --

Do we need to re-wrap the exceptions here? Can we simply let the original 
exception bubble up?


> NPE when using value state on non-keyed stream
> --
>
> Key: FLINK-5289
> URL: https://issues.apache.org/jira/browse/FLINK-5289
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Timo Walther
>Assignee: Stefan Richter
>
> Using a {{ValueStateDescriptor}} and 
> {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to 
> {{NullPointerException}} which is not very helpful for users:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735125#comment-15735125
 ] 

ASF GitHub Bot commented on FLINK-5289:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2969#discussion_r91703759
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 ---
@@ -106,17 +109,45 @@ public boolean hasBroadcastVariable(String name) {
 
@Override
public  ValueState getState(ValueStateDescriptor 
stateProperties) {
-   return operator.getKeyedStateStore().getState(stateProperties);
+   KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
+
+   try {
+   
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+   return keyedStateStore.getState(stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
+   }
}
 
@Override
public  ListState getListState(ListStateDescriptor 
stateProperties) {
-   return 
operator.getKeyedStateStore().getListState(stateProperties);
+   KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
+
+   try {
+   
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+   return keyedStateStore.getListState(stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
+   }
}
 
@Override
public  ReducingState getReducingState(ReducingStateDescriptor 
stateProperties) {
-   return 
operator.getKeyedStateStore().getReducingState(stateProperties);
+   KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
+
+   try {
+   
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+   return 
keyedStateStore.getReducingState(stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
+   }
+   }
+
+   private KeyedStateStore 
checkPreconditionsAndGetKeyedStateStore(StateDescriptor stateDescriptor) {
+   Preconditions.checkNotNull(stateDescriptor, "The state 
properties must not be null");
+   KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
+   Preconditions.checkNotNull(keyedStateStore, "Keyed state store 
is null. This can only be called after a keyBy.");
--- End diff --

How about changing this message to
`Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' 
operation.`


> NPE when using value state on non-keyed stream
> --
>
> Key: FLINK-5289
> URL: https://issues.apache.org/jira/browse/FLINK-5289
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Timo Walther
>Assignee: Stefan Richter
>
> Using a {{ValueStateDescriptor}} and 
> {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to 
> {{NullPointerException}} which is not very helpful for users:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream

2016-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15732605#comment-15732605
 ] 

ASF GitHub Bot commented on FLINK-5289:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2969

[FLINK-5289] Meaningful exception when using value state on non-keyed…

This PR fixes [FLINK-5289] and introduces a meaningful exception when 
registering a keyed state on non-keyed stream.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink register-state-NPE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2969.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2969


commit fba7f61ee6ff7155ee166ec01134f81f0b7f4457
Author: Stefan Richter 
Date:   2016-12-08T14:07:58Z

[FLINK-5289] Meaningful exception when using value state on non-keyed stream




> NPE when using value state on non-keyed stream
> --
>
> Key: FLINK-5289
> URL: https://issues.apache.org/jira/browse/FLINK-5289
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Timo Walther
>Assignee: Stefan Richter
>
> Using a {{ValueStateDescriptor}} and 
> {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to 
> {{NullPointerException}} which is not very helpful for users:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream

2016-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15732607#comment-15732607
 ] 

ASF GitHub Bot commented on FLINK-5289:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2969
  
CC @twalthr 


> NPE when using value state on non-keyed stream
> --
>
> Key: FLINK-5289
> URL: https://issues.apache.org/jira/browse/FLINK-5289
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Timo Walther
>Assignee: Stefan Richter
>
> Using a {{ValueStateDescriptor}} and 
> {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to 
> {{NullPointerException}} which is not very helpful for users:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)