Repository: flink
Updated Branches:
  refs/heads/master 717a7dc81 -> abc3e1c88


http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
new file mode 100644
index 0000000..5f7032d
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.ImmutableValueState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableValueState}.
+ */
+public class ImmutableValueStateTest {
+
+       private final ValueStateDescriptor<Long> valueStateDesc =
+                       new ValueStateDescriptor<>("test", 
BasicTypeInfo.LONG_TYPE_INFO);
+
+       private ImmutableValueState<Long> valueState;
+
+       @Before
+       public void setUp() throws Exception {
+               if (!valueStateDesc.isSerializerInitialized()) {
+                       valueStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               valueState = ImmutableValueState.createState(
+                               valueStateDesc,
+                               
ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+               );
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUpdate() {
+               long value = valueState.value();
+               assertEquals(42L, value);
+
+               valueState.update(54L);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testClear() {
+               long value = valueState.value();
+               assertEquals(42L, value);
+
+               valueState.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 51af430..ebcd7d5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -745,7 +745,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
                return new QueryableStateStream<>(
                                queryableStateName,
-                               stateDescriptor.getSerializer(),
+                               stateDescriptor,
                                
getKeyType().createSerializer(getExecutionConfig()));
        }
 
@@ -772,7 +772,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
                return new QueryableStateStream<>(
                                queryableStateName,
-                               stateDescriptor.getSerializer(),
+                               stateDescriptor,
                                
getKeyType().createSerializer(getExecutionConfig()));
        }
 
@@ -796,7 +796,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
                return new QueryableStateStream<>(
                                queryableStateName,
-                               stateDescriptor.getSerializer(),
+                               stateDescriptor,
                                
getKeyType().createSerializer(getExecutionConfig()));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
index d0de2ab..7f20fd6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -37,23 +38,23 @@ public class QueryableStateStream<K, V> {
        /** Key serializer for the state instance. */
        private final TypeSerializer<K> keySerializer;
 
-       /** Value serializer for the state instance. */
-       private final TypeSerializer<V> valueSerializer;
+       /** State descriptor for the state instance. */
+       private final StateDescriptor<?, V> stateDescriptor;
 
        /**
         * Creates a queryable state stream.
         *
         * @param queryableStateName Name under which to publish the queryable 
state instance
-        * @param valueSerializer Value serializer for the state instance
+        * @param stateDescriptor The state descriptor for the state instance
         * @param keySerializer Key serializer for the state instance
         */
        public QueryableStateStream(
                        String queryableStateName,
-                       TypeSerializer<V> valueSerializer,
+                       StateDescriptor<?, V> stateDescriptor,
                        TypeSerializer<K> keySerializer) {
 
                this.queryableStateName = 
Preconditions.checkNotNull(queryableStateName, "Queryable state name");
-               this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer, "Value serializer");
+               this.stateDescriptor = 
Preconditions.checkNotNull(stateDescriptor, "State Descriptor");
                this.keySerializer = Preconditions.checkNotNull(keySerializer, 
"Key serializer");
        }
 
@@ -67,15 +68,6 @@ public class QueryableStateStream<K, V> {
        }
 
        /**
-        * Returns the value serializer for the queryable state instance.
-        *
-        * @return Value serializer for the state instance
-        */
-       public TypeSerializer<V> getValueSerializer() {
-               return valueSerializer;
-       }
-
-       /**
         * Returns the key serializer for the queryable state instance.
         *
         * @return Key serializer for the state instance.
@@ -84,4 +76,12 @@ public class QueryableStateStream<K, V> {
                return keySerializer;
        }
 
+       /**
+        * Returns the state descriptor for the queryable state instance.
+        *
+        * @return State descriptor for the state instance
+        */
+       public StateDescriptor<?, V> getStateDescriptor() {
+               return stateDescriptor;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index aaeb1ec..49bdbd9 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -497,7 +497,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
 
     new QueryableStateStream(
       queryableStateName,
-      stateDescriptor.getSerializer,
+      stateDescriptor,
       getKeyType.createSerializer(executionConfig))
   }
 
@@ -522,7 +522,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
 
     new QueryableStateStream(
       queryableStateName,
-      stateDescriptor.getSerializer,
+      stateDescriptor,
       getKeyType.createSerializer(executionConfig))
   }
 
@@ -546,7 +546,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
 
     new QueryableStateStream(
       queryableStateName,
-      stateDescriptor.getSerializer,
+      stateDescriptor,
       getKeyType.createSerializer(executionConfig))
   }
   

Reply via email to