[hotfix] [py] Fix PythonCoGroup useByteArray check
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f78e9d7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f78e9d7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f78e9d7 Branch: refs/heads/table-retraction Commit: 8f78e9d72f494c4eb773211beb30a7116d048109 Parents: 89d9dec Author: zentol <ches...@apache.org> Authored: Thu Mar 30 23:13:50 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Apr 6 10:57:10 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/python/api/functions/PythonCoGroup.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8f78e9d7/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java index 72d3361..ff5a8d4 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java @@ -12,6 +12,7 @@ */ package org.apache.flink.python.api.functions; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer; @@ -36,7 +37,7 @@ public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, public PythonCoGroup(int envID, int setID, TypeInformation<OUT> typeInformation) { this.typeInformation = typeInformation; - streamer = new PythonDualInputStreamer<>(this, envID, setID, true); + streamer = new PythonDualInputStreamer<>(this, envID, setID, typeInformation instanceof PrimitiveArrayTypeInfo); } /**