Repository: flink
Updated Branches:
  refs/heads/master 24408e190 -> 03889ae1f


[FLINK-6041] [streaming api] Move StreamingFunctionUtils to 
'org.apache.flink.streaming.util'

This close #3532


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03889ae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03889ae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03889ae1

Branch: refs/heads/master
Commit: 03889ae1f25bab0fc42ae695bdf45cf45658eab6
Parents: 40a156e
Author: liuyuzhong7 <liuyuzho...@gmail.com>
Authored: Tue Mar 14 20:23:25 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 .../functions/util/StreamingFunctionUtils.java  | 215 -------------------
 .../operators/AbstractUdfStreamOperator.java    |   2 +-
 .../util/functions/StreamingFunctionUtils.java  | 215 +++++++++++++++++++
 .../functions/InternalWindowFunctionTest.java   |   2 +-
 4 files changed, 217 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
deleted file mode 100644
index 679ef0b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Utility class that contains helper methods to work with Flink Streaming
- * {@link Function Functions}. This is similar to
- * {@link org.apache.flink.api.common.functions.util.FunctionUtils} but has 
additional methods
- * for invoking interfaces that only exist in the streaming API.
- */
-@Internal
-public final class StreamingFunctionUtils {
-
-       @SuppressWarnings("unchecked")
-       public static <T> void setOutputType(
-                       Function userFunction,
-                       TypeInformation<T> outTypeInfo,
-                       ExecutionConfig executionConfig) {
-
-               Preconditions.checkNotNull(outTypeInfo);
-               Preconditions.checkNotNull(executionConfig);
-
-               while (true) {
-                       if (trySetOutputType(userFunction, outTypeInfo, 
executionConfig)) {
-                               break;
-                       }
-
-                       // inspect if the user function is wrapped, then unwrap 
and try again if we can snapshot the inner function
-                       if (userFunction instanceof WrappingFunction) {
-                               userFunction = ((WrappingFunction<?>) 
userFunction).getWrappedFunction();
-                       } else {
-                               break;
-                       }
-
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       private static  <T> boolean trySetOutputType(
-                       Function userFunction,
-                       TypeInformation<T> outTypeInfo,
-                       ExecutionConfig executionConfig) {
-
-               Preconditions.checkNotNull(outTypeInfo);
-               Preconditions.checkNotNull(executionConfig);
-
-               if 
(OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) {
-                       ((OutputTypeConfigurable<T>) 
userFunction).setOutputType(outTypeInfo, executionConfig);
-                       return true;
-               }
-               return false;
-       }
-
-       public static void snapshotFunctionState(
-                       StateSnapshotContext context,
-                       OperatorStateBackend backend,
-                       Function userFunction) throws Exception {
-
-               Preconditions.checkNotNull(context);
-               Preconditions.checkNotNull(backend);
-
-               while (true) {
-
-                       if (trySnapshotFunctionState(context, backend, 
userFunction)) {
-                               break;
-                       }
-
-                       // inspect if the user function is wrapped, then unwrap 
and try again if we can snapshot the inner function
-                       if (userFunction instanceof WrappingFunction) {
-                               userFunction = ((WrappingFunction<?>) 
userFunction).getWrappedFunction();
-                       } else {
-                               break;
-                       }
-               }
-       }
-
-       private static boolean trySnapshotFunctionState(
-                       StateSnapshotContext context,
-                       OperatorStateBackend backend,
-                       Function userFunction) throws Exception {
-
-               if (userFunction instanceof CheckpointedFunction) {
-                       ((CheckpointedFunction) 
userFunction).snapshotState(context);
-
-                       return true;
-               }
-
-               if (userFunction instanceof ListCheckpointed) {
-                       @SuppressWarnings("unchecked")
-                       List<Serializable> partitionableState = 
((ListCheckpointed<Serializable>) userFunction).
-                                       
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
-
-                       ListState<Serializable> listState = backend.
-                                       
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
-
-                       listState.clear();
-
-                       if (null != partitionableState) {
-                               try {
-                                       for (Serializable statePartition : 
partitionableState) {
-                                               listState.add(statePartition);
-                                       }
-                               } catch (Exception e) {
-                                       listState.clear();
-
-                                       throw new Exception("Could not write 
partitionable state to operator " +
-                                               "state backend.", e);
-                               }
-                       }
-
-                       return true;
-               }
-
-               return false;
-       }
-
-       public static void restoreFunctionState(
-                       StateInitializationContext context,
-                       Function userFunction) throws Exception {
-
-               Preconditions.checkNotNull(context);
-
-               while (true) {
-
-                       if (tryRestoreFunction(context, userFunction)) {
-                               break;
-                       }
-
-                       // inspect if the user function is wrapped, then unwrap 
and try again if we can restore the inner function
-                       if (userFunction instanceof WrappingFunction) {
-                               userFunction = ((WrappingFunction<?>) 
userFunction).getWrappedFunction();
-                       } else {
-                               break;
-                       }
-               }
-       }
-
-       private static boolean tryRestoreFunction(
-                       StateInitializationContext context,
-                       Function userFunction) throws Exception {
-
-               if (userFunction instanceof CheckpointedFunction) {
-                       ((CheckpointedFunction) 
userFunction).initializeState(context);
-
-                       return true;
-               }
-
-               if (context.isRestored() && userFunction instanceof 
ListCheckpointed) {
-                       @SuppressWarnings("unchecked")
-                       ListCheckpointed<Serializable> listCheckpointedFun = 
(ListCheckpointed<Serializable>) userFunction;
-
-                       ListState<Serializable> listState = 
context.getOperatorStateStore().
-                                       
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
-
-                       List<Serializable> list = new ArrayList<>();
-
-                       for (Serializable serializable : listState.get()) {
-                               list.add(serializable);
-                       }
-
-                       try {
-                               listCheckpointedFun.restoreState(list);
-                       } catch (Exception e) {
-
-                               throw new Exception("Failed to restore state to 
function: " + e.getMessage(), e);
-                       }
-
-                       return true;
-               }
-
-               return false;
-       }
-
-       /**
-        * Private constructor to prevent instantiation.
-        */
-       private StreamingFunctionUtils() {
-               throw new RuntimeException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 166287b..19559e1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
new file mode 100644
index 0000000..4482431
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class that contains helper methods to work with Flink Streaming
+ * {@link Function Functions}. This is similar to
+ * {@link org.apache.flink.api.common.functions.util.FunctionUtils} but has 
additional methods
+ * for invoking interfaces that only exist in the streaming API.
+ */
+@Internal
+public final class StreamingFunctionUtils {
+
+       @SuppressWarnings("unchecked")
+       public static <T> void setOutputType(
+                       Function userFunction,
+                       TypeInformation<T> outTypeInfo,
+                       ExecutionConfig executionConfig) {
+
+               Preconditions.checkNotNull(outTypeInfo);
+               Preconditions.checkNotNull(executionConfig);
+
+               while (true) {
+                       if (trySetOutputType(userFunction, outTypeInfo, 
executionConfig)) {
+                               break;
+                       }
+
+                       // inspect if the user function is wrapped, then unwrap 
and try again if we can snapshot the inner function
+                       if (userFunction instanceof WrappingFunction) {
+                               userFunction = ((WrappingFunction<?>) 
userFunction).getWrappedFunction();
+                       } else {
+                               break;
+                       }
+
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static  <T> boolean trySetOutputType(
+                       Function userFunction,
+                       TypeInformation<T> outTypeInfo,
+                       ExecutionConfig executionConfig) {
+
+               Preconditions.checkNotNull(outTypeInfo);
+               Preconditions.checkNotNull(executionConfig);
+
+               if 
(OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) {
+                       ((OutputTypeConfigurable<T>) 
userFunction).setOutputType(outTypeInfo, executionConfig);
+                       return true;
+               }
+               return false;
+       }
+
+       public static void snapshotFunctionState(
+                       StateSnapshotContext context,
+                       OperatorStateBackend backend,
+                       Function userFunction) throws Exception {
+
+               Preconditions.checkNotNull(context);
+               Preconditions.checkNotNull(backend);
+
+               while (true) {
+
+                       if (trySnapshotFunctionState(context, backend, 
userFunction)) {
+                               break;
+                       }
+
+                       // inspect if the user function is wrapped, then unwrap 
and try again if we can snapshot the inner function
+                       if (userFunction instanceof WrappingFunction) {
+                               userFunction = ((WrappingFunction<?>) 
userFunction).getWrappedFunction();
+                       } else {
+                               break;
+                       }
+               }
+       }
+
+       private static boolean trySnapshotFunctionState(
+                       StateSnapshotContext context,
+                       OperatorStateBackend backend,
+                       Function userFunction) throws Exception {
+
+               if (userFunction instanceof CheckpointedFunction) {
+                       ((CheckpointedFunction) 
userFunction).snapshotState(context);
+
+                       return true;
+               }
+
+               if (userFunction instanceof ListCheckpointed) {
+                       @SuppressWarnings("unchecked")
+                       List<Serializable> partitionableState = 
((ListCheckpointed<Serializable>) userFunction).
+                                       
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
+
+                       ListState<Serializable> listState = backend.
+                                       
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+
+                       listState.clear();
+
+                       if (null != partitionableState) {
+                               try {
+                                       for (Serializable statePartition : 
partitionableState) {
+                                               listState.add(statePartition);
+                                       }
+                               } catch (Exception e) {
+                                       listState.clear();
+
+                                       throw new Exception("Could not write 
partitionable state to operator " +
+                                               "state backend.", e);
+                               }
+                       }
+
+                       return true;
+               }
+
+               return false;
+       }
+
+       public static void restoreFunctionState(
+                       StateInitializationContext context,
+                       Function userFunction) throws Exception {
+
+               Preconditions.checkNotNull(context);
+
+               while (true) {
+
+                       if (tryRestoreFunction(context, userFunction)) {
+                               break;
+                       }
+
+                       // inspect if the user function is wrapped, then unwrap 
and try again if we can restore the inner function
+                       if (userFunction instanceof WrappingFunction) {
+                               userFunction = ((WrappingFunction<?>) 
userFunction).getWrappedFunction();
+                       } else {
+                               break;
+                       }
+               }
+       }
+
+       private static boolean tryRestoreFunction(
+                       StateInitializationContext context,
+                       Function userFunction) throws Exception {
+
+               if (userFunction instanceof CheckpointedFunction) {
+                       ((CheckpointedFunction) 
userFunction).initializeState(context);
+
+                       return true;
+               }
+
+               if (context.isRestored() && userFunction instanceof 
ListCheckpointed) {
+                       @SuppressWarnings("unchecked")
+                       ListCheckpointed<Serializable> listCheckpointedFun = 
(ListCheckpointed<Serializable>) userFunction;
+
+                       ListState<Serializable> listState = 
context.getOperatorStateStore().
+                                       
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+
+                       List<Serializable> list = new ArrayList<>();
+
+                       for (Serializable serializable : listState.get()) {
+                               list.add(serializable);
+                       }
+
+                       try {
+                               listCheckpointedFun.restoreState(list);
+                       } catch (Exception e) {
+
+                               throw new Exception("Failed to restore state to 
function: " + e.getMessage(), e);
+                       }
+
+                       return true;
+               }
+
+               return false;
+       }
+
+       /**
+        * Private constructor to prevent instantiation.
+        */
+       private StreamingFunctionUtils() {
+               throw new RuntimeException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 8f795e9..d4fefa2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;

Reply via email to