[hotfix] [py] Fix PythonCoGroup useByteArray check

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f78e9d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f78e9d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f78e9d7

Branch: refs/heads/table-retraction
Commit: 8f78e9d72f494c4eb773211beb30a7116d048109
Parents: 89d9dec
Author: zentol <ches...@apache.org>
Authored: Thu Mar 30 23:13:50 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Apr 6 10:57:10 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/python/api/functions/PythonCoGroup.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f78e9d7/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index 72d3361..ff5a8d4 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -12,6 +12,7 @@
  */
 package org.apache.flink.python.api.functions;
 
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer;
@@ -36,7 +37,7 @@ public class PythonCoGroup<IN1, IN2, OUT> extends 
RichCoGroupFunction<IN1, IN2,
 
        public PythonCoGroup(int envID, int setID, TypeInformation<OUT> 
typeInformation) {
                this.typeInformation = typeInformation;
-               streamer = new PythonDualInputStreamer<>(this, envID, setID, 
true);
+               streamer = new PythonDualInputStreamer<>(this, envID, setID, 
typeInformation instanceof PrimitiveArrayTypeInfo);
        }
 
        /**

Reply via email to