Repository: flink Updated Branches: refs/heads/master 6b402f43d -> 7407076d3
http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java index d2b2032..6c53b6b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java @@ -71,7 +71,7 @@ public class StreamJoinOperator<I1, I2> extends */ public JoinPredicate<I1, I2> where(int... fields) { return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys<I1>(fields, type1), type1)); + new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig())); } /** @@ -88,7 +88,7 @@ public class StreamJoinOperator<I1, I2> extends */ public JoinPredicate<I1, I2> where(String... fields) { return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys<I1>(fields, type1), type1)); + new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig())); } /** @@ -158,7 +158,7 @@ public class StreamJoinOperator<I1, I2> extends */ public JoinedStream<I1, I2> equalTo(int... fields) { keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2), - type2); + type2, op.input1.getExecutionEnvironment().getConfig()); return createJoinOperator(); } @@ -177,7 +177,7 @@ public class StreamJoinOperator<I1, I2> extends */ public JoinedStream<I1, I2> equalTo(String... fields) { this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, - type2), type2); + type2), type2, op.input1.getExecutionEnvironment().getConfig()); return createJoinOperator(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 45c14c1..65dde79 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.List; +import com.esotericsoftware.kryo.Serializer; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; @@ -30,6 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.Client.OptimizerPlanEnvironment; @@ -78,15 +80,7 @@ public abstract class StreamExecutionEnvironment { * Constructor for creating StreamExecutionEnvironment */ protected StreamExecutionEnvironment() { - streamGraph = new StreamGraph(); - } - - /** - * Sets the config object. - */ - public void setConfig(ExecutionConfig config) { - Validate.notNull(config); - this.config = config; + streamGraph = new StreamGraph(config); } /** @@ -181,6 +175,57 @@ public abstract class StreamExecutionEnvironment { } // -------------------------------------------------------------------------------------------- + // Registry for types and serializers + // -------------------------------------------------------------------------------------------- + + /** + * Registers the given Serializer as a default serializer for the given type at the + * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}. + * + * Note that the serializer instance must be serializable (as defined by java.io.Serializable), + * because it may be distributed to the worker nodes by java serialization. + * + * @param type The class of the types serialized with the given serializer. + * @param serializer The serializer to use. + */ + public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) { + config.registerKryoSerializer(type, serializer); + } + + /** + * Registers the given Serializer via its class as a serializer for the given type at the + * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}. + * + * @param type The class of the types serialized with the given serializer. + * @param serializerClass The class of the serializer to use. + */ + public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { + config.registerKryoSerializer(type, serializerClass); + } + + /** + * Registers the given type with the serialization stack. If the type is eventually + * serialized as a POJO, then the type is registered with the POJO serializer. If the + * type ends up being serialized with Kryo, then it will be registered at Kryo to make + * sure that only tags are written. + * + * @param type The class of the type to register. + */ + public void registerType(Class<?> type) { + if (type == null) { + throw new NullPointerException("Cannot register null type class."); + } + + TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type); + + if (typeInfo instanceof PojoTypeInfo) { + config.registerPojoType(type); + } else { + config.registerKryoType(type); + } + } + + // -------------------------------------------------------------------------------------------- // Data stream creations // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java index 1c273d3..3704e3b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java @@ -17,9 +17,9 @@ package org.apache.flink.streaming.api.function.aggregation; -import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; -public abstract class AggregationFunction<T> implements ReduceFunction<T> { +public abstract class AggregationFunction<T> extends RichReduceFunction<T> { private static final long serialVersionUID = 1L; public int position; http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java index 226c45a..7f7cf0b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java @@ -193,7 +193,7 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T> { if (cType instanceof PojoTypeInfo) { pojoComparator = (PojoComparator<T>) cType.createComparator( - new int[] { logicalKeyPosition }, new boolean[] { false }, 0); + new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig()); } else { throw new IllegalArgumentException( "Key expressions are only supported on POJO types. " http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java index 142028b..74e4597 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java @@ -146,7 +146,7 @@ public abstract class SumAggregator { if (cType instanceof PojoTypeInfo) { comparator = (PojoComparator<T>) cType.createComparator( - new int[] { logicalKeyPosition }, new boolean[] { false }, 0); + new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig()); } else { throw new IllegalArgumentException( "Key expressions are only supported on POJO types. " http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java index a5ef3a7..6d1441a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -45,8 +45,8 @@ public class FileSourceFunction extends RichSourceFunction<String> { this.serializerFactory = createSerializer(typeInfo); } - private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) { - TypeSerializer<String> serializer = typeInfo.createSerializer(); + private TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) { + TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); return new RuntimeSerializerFactory<String>(serializer, typeInfo.getTypeClass()); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index 6cee5f2..793c952 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.invokable; import java.io.IOException; import java.io.Serializable; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -48,6 +49,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { protected StreamTaskContext<OUT> taskContext; + protected ExecutionConfig executionConfig = null; + protected MutableObjectIterator<StreamRecord<IN>> recordIterator; protected StreamRecordSerializer<IN> inSerializer; protected TypeSerializer<IN> objectSerializer; @@ -67,11 +70,12 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { /** * Initializes the {@link StreamInvokable} for input and output handling - * + * * @param taskContext * StreamTaskContext representing the vertex + * @param executionConfig */ - public void setup(StreamTaskContext<OUT> taskContext) { + public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig) { this.collector = taskContext.getOutputCollector(); this.recordIterator = taskContext.getInput(0); this.inSerializer = taskContext.getInputSerializer(0); @@ -80,6 +84,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { this.objectSerializer = inSerializer.getObjectSerializer(); } this.taskContext = taskContext; + this.executionConfig = executionConfig; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java index 997463c..df2edd2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java @@ -354,7 +354,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { clonedDistributedEvictionPolicies); } - groupInvokable.setup(taskContext); + groupInvokable.setup(taskContext, executionConfig); groupInvokable.open(this.parameters); windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java index c9d9e5a..69c7cee 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java @@ -35,7 +35,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, super(null); this.fields = fields; this.numFields = this.fields.length; - this.outTypeSerializer = outTypeInformation.createSerializer(); + this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java index 604873e..9f98db3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.invokable.operator.co; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.invokable.StreamInvokable; @@ -46,7 +47,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU protected TypeSerializer<IN2> serializer2; @Override - public void setup(StreamTaskContext<OUT> taskContext) { + public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig) { this.collector = taskContext.getOutputCollector(); this.recordIterator = taskContext.getCoReader(); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java index 98f12ec..cd68937 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.streamrecord; import java.io.IOException; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; @@ -32,8 +33,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord private final TypeSerializer<T> typeSerializer; private final boolean isTuple; - public StreamRecordSerializer(TypeInformation<T> typeInfo) { - this.typeSerializer = typeInfo.createSerializer(); + public StreamRecordSerializer(TypeInformation<T> typeInfo, ExecutionConfig executionConfig) { + this.typeSerializer = typeInfo.createSerializer(executionConfig); this.isTuple = typeInfo.isTupleType(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java index 2b650be..83cdcd1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java @@ -68,7 +68,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> { @Override protected void setInvokable() { userInvokable = configuration.getUserInvokable(userClassLoader); - userInvokable.setup(this); + userInvokable.setup(this, getExecutionConfig()); } protected void setConfigInputs() throws StreamVertexException { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 994b1fa..024e415 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -97,7 +97,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected void setInvokable() { userInvokable = configuration.getUserInvokable(userClassLoader); - userInvokable.setup(this); + userInvokable.setup(this, getExecutionConfig()); } public String getName() { @@ -111,7 +111,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa public StreamingRuntimeContext createRuntimeContext(String taskName, Map<String, OperatorState<?>> states) { Environment env = getEnvironment(); - return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), states); + return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), getExecutionConfig(), states); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java index a1a64e2..0daf3c2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.streamvertex; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.configuration.Configuration; @@ -38,9 +39,9 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { private final Map<String, OperatorState<?>> operatorStates; public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader, - Map<String, OperatorState<?>> operatorStates) { + ExecutionConfig executionConfig, Map<String, OperatorState<?>> operatorStates) { super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), - userCodeClassLoader, env.getCopyTask()); + userCodeClassLoader, executionConfig, env.getCopyTask()); this.env = env; this.operatorStates = operatorStates; } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java index 08afd0d..77467b5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.util.keys; import java.lang.reflect.Array; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -60,12 +61,13 @@ public class KeySelectorUtil { Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class }; - public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo) { + public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) { int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); int keyLength = logicalKeyPositions.length; boolean[] orders = new boolean[keyLength]; + // TODO: Fix using KeySelector everywhere TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator( - logicalKeyPositions, orders, 0); + logicalKeyPositions, orders, 0, executionConfig); return new ComparableKeySelector<X>(comparator, keyLength); } http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index 49cd497..115f614 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; @@ -118,7 +119,7 @@ public class AggregationFunctionTest { KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys( new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, typeInfo), - typeInfo); + typeInfo, new ExecutionConfig()); List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute( new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, keySelector), http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java index ea94f98..a6560ae 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -57,9 +58,9 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> { this.inputIterator2 = input2.iterator(); TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next()); - inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1); + inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1, new ExecutionConfig()); TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next()); - inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2); + inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2, new ExecutionConfig()); mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2); @@ -154,7 +155,7 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> { public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable, List<IN1> input1, List<IN2> input2) { MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2); - invokable.setup(mockContext); + invokable.setup(mockContext, new ExecutionConfig()); try { invokable.open(null); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 5537052..81467dc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.StreamConfig; @@ -49,7 +50,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> { } TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next()); - inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo); + inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo, new ExecutionConfig()); iterator = new MockInputIterator(); outputs = new ArrayList<OUT>(); @@ -104,7 +105,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> { public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) { MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs); - invokable.setup(mockContext); + invokable.setup(mockContext, new ExecutionConfig()); try { invokable.open(null); invokable.invoke(); http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 177a9ee..23495a5 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator, GroupedDataStream} import scala.reflect.ClassTag @@ -41,7 +42,6 @@ import org.apache.flink.streaming.api.collector.OutputSelector import scala.collection.JavaConversions._ import java.util.HashMap import org.apache.flink.streaming.api.function.aggregation.SumFunction -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.function.aggregation.AggregationFunction import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo @@ -290,7 +290,9 @@ class DataStream[T](javaStream: JavaStream[T]) { val jStream = javaStream.asInstanceOf[JavaStream[Product]] val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] - val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) + val agg = new ScalaStreamingAggregator[Product]( + jStream.getType().createSerializer(javaStream.getExecutionEnvironment.getConfig), + position) val reducer = aggregationType match { case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala index a408ec0..06271fd 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.api.common.ExecutionConfig + import scala.reflect.ClassTag import org.apache.commons.lang.Validate import org.apache.flink.api.common.functions.CrossFunction @@ -44,10 +46,10 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) { - override def createSerializer: TypeSerializer[(I1, I2)] = { + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = { val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer + fieldSerializers(i) = types(i).createSerializer(executionConfig) } new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 394673c..bc9b422 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -18,6 +18,9 @@ package org.apache.flink.streaming.api.scala +import com.esotericsoftware.kryo.Serializer +import org.apache.flink.api.java.typeutils.runtime.KryoSerializer + import scala.reflect.ClassTag import org.apache.commons.lang.Validate import org.apache.flink.api.common.typeinfo.TypeInformation @@ -73,6 +76,33 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { def getBufferTimout: Long = javaEnv.getBufferTimeout() /** + * Registers the given Serializer as a default serializer for the given class at the + * [[KryoSerializer]]. + */ + def registerKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = { + javaEnv.registerKryoSerializer(clazz, serializer) + } + + /** + * Registers the given Serializer as a default serializer for the given class at the + * [[KryoSerializer]] + */ + def registerKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) { + javaEnv.registerKryoSerializer(clazz, serializer) + } + + /** + * Registers the given type with the serialization stack. If the type is eventually + * serialized as a POJO, then the type is registered with the POJO serializer. If the + * type ends up being serialized with Kryo, then it will be registered at Kryo to make + * sure that only tags are written. + * + */ + def registerType(typeClass: Class[_]) { + javaEnv.registerType(typeClass) + } + + /** * Creates a DataStream that represents the Strings produced by reading the * given file line wise. The file will be read with the system's default * character set. http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala index 35a94cd..67f7aae 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.api.common.ExecutionConfig + import scala.Array.canBuildFrom import scala.reflect.ClassTag import org.apache.commons.lang.Validate @@ -46,7 +48,7 @@ TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { object StreamJoinOperator { - class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends + class JoinWindow[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2]) extends TemporalWindow[JoinWindow[I1, I2]] { private[flink] val type1 = op.input1.getType() @@ -59,7 +61,9 @@ object StreamJoinOperator { */ def where(fields: Int*) = { new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray, type1), type1)) + new Keys.ExpressionKeys(fields.toArray, type1), + type1, + op.input1.getExecutionEnvironment.getConfig)) } /** @@ -70,7 +74,9 @@ object StreamJoinOperator { */ def where(firstField: String, otherFields: String*) = new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), type1)) + new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), + type1, + op.input1.getExecutionEnvironment.getConfig)) /** * Continues a temporal Join transformation by defining @@ -112,7 +118,9 @@ object StreamJoinOperator { */ def equalTo(fields: Int*): JoinedStream[I1, I2] = { finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray, type2), type2)) + new Keys.ExpressionKeys(fields.toArray, type2), + type2, + op.input1.getExecutionEnvironment.getConfig)) } /** @@ -123,7 +131,9 @@ object StreamJoinOperator { */ def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), type2)) + new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), + type2, + op.input1.getExecutionEnvironment.getConfig)) /** * Creates a temporal join transformation by defining the second join key. @@ -151,10 +161,11 @@ object StreamJoinOperator { classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) { - override def createSerializer: TypeSerializer[(I1, I2)] = { + override def createSerializer( + executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = { val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer + fieldSerializers(i) = types(i).createSerializer(executionConfig) } new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala index 5c734bf..33bbc67 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase + import scala.Array.canBuildFrom import scala.collection.JavaConversions.iterableAsScalaIterable import scala.reflect.ClassTag @@ -26,7 +28,6 @@ import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream} import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType @@ -234,7 +235,10 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { val jStream = javaStream.asInstanceOf[JavaWStream[Product]] val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] - val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) + val agg = new ScalaStreamingAggregator[Product]( + jStream.getType().createSerializer( + javaStream.getDataStream.getExecutionEnvironment.getConfig), + position) val reducer = aggregationType match { case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java new file mode 100644 index 0000000..05ffc88 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.exampleJavaPrograms; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +@SuppressWarnings("serial") +public class WordCountSubclassInterfacePOJOITCase extends JavaProgramTestBase implements Serializable { + private static final long serialVersionUID = 1L; + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<String> text = env.readTextFile(textPath); + + DataSet<WCBase> counts = text + .flatMap(new Tokenizer()) + .groupBy("word") + .reduce(new ReduceFunction<WCBase>() { + private static final long serialVersionUID = 1L; + public WCBase reduce(WCBase value1, WCBase value2) { + WC wc1 = (WC) value1; + WC wc2 = (WC) value2; + int c = wc1.secretCount.getCount() + wc2.secretCount.getCount(); + wc1.secretCount.setCount(c); + return wc1; + } + }) + .map(new MapFunction<WCBase, WCBase>() { + @Override + public WCBase map(WCBase value) throws Exception { + WC wc = (WC) value; + wc.count = wc.secretCount.getCount(); + return wc; + } + }); + + counts.writeAsText(resultPath); + + env.execute("WordCount with custom data types example"); + } + + public static final class Tokenizer implements FlatMapFunction<String, WCBase> { + + @Override + public void flatMap(String value, Collector<WCBase> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new WC(token, 1)); + } + } + } + } + + public static abstract class WCBase { + public String word; + public int count; + + public WCBase(String w, int c) { + this.word = w; + this.count = c; + } + @Override + public String toString() { + return word+" "+count; + } + } + + public static interface CrazyCounter { + public int getCount(); + public void setCount(int c); + } + + public static class CrazyCounterImpl implements CrazyCounter { + public int countz; + + public CrazyCounterImpl() { + } + + public CrazyCounterImpl(int c) { + this.countz = c; + } + + @Override + public int getCount() { + return countz; + } + + @Override + public void setCount(int c) { + this.countz = c; + } + + } + + public static class WC extends WCBase { + public CrazyCounter secretCount; + + public WC() { + super(null, 0); + } + + public WC(String w, int c) { + super(w, 0); + this.secretCount = new CrazyCounterImpl(c); + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java new file mode 100644 index 0000000..f74ee16 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.exampleJavaPrograms; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +@SuppressWarnings("serial") +public class WordCountSubclassPOJOITCase extends JavaProgramTestBase implements Serializable { + private static final long serialVersionUID = 1L; + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<String> text = env.readTextFile(textPath); + + DataSet<WCBase> counts = text + .flatMap(new Tokenizer()) + .groupBy("word") + .reduce(new ReduceFunction<WCBase>() { + private static final long serialVersionUID = 1L; + public WCBase reduce(WCBase value1, WCBase value2) { + WC wc1 = (WC) value1; + WC wc2 = (WC) value2; + return new WC(value1.word, wc1.secretCount + wc2.secretCount); + } + }) + .map(new MapFunction<WCBase, WCBase>() { + @Override + public WCBase map(WCBase value) throws Exception { + WC wc = (WC) value; + wc.count = wc.secretCount; + return wc; + } + }); + + counts.writeAsText(resultPath); + + env.execute("WordCount with custom data types example"); + } + + public static final class Tokenizer implements FlatMapFunction<String, WCBase> { + + @Override + public void flatMap(String value, Collector<WCBase> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new WC(token, 1)); + } + } + } + } + + public static abstract class WCBase { + public String word; + public int count; + + public WCBase(String w, int c) { + this.word = w; + this.count = c; + } + @Override + public String toString() { + return word+" "+count; + } + } + + public static class WC extends WCBase { + + public int secretCount; + + public WC() { + super(null, 0); + } + + public WC(String w, int c) { + super(w, 0); + this.secretCount = c; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala index 0a0e50d..84a0032 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.io +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.io.CollectionInputFormat import org.junit.Assert.assertEquals import org.junit.Assert.assertNotNull @@ -55,8 +56,11 @@ class CollectionInputFormatTest { val inputCollection = Seq(new ElementType(1), new ElementType(2), new ElementType(3)) val info = createTypeInformation[ElementType] - val inputFormat: CollectionInputFormat[ElementType] = new - CollectionInputFormat[ElementType](inputCollection.asJava, info.createSerializer()) + val inputFormat: CollectionInputFormat[ElementType] = { + new CollectionInputFormat[ElementType]( + inputCollection.asJava, + info.createSerializer(new ExecutionConfig)) + } val buffer = new ByteArrayOutputStream val out = new ObjectOutputStream(buffer) @@ -107,7 +111,7 @@ class CollectionInputFormatTest { val inputFormat = new CollectionInputFormat[String]( data.asJava, - BasicTypeInfo.STRING_TYPE_INFO.createSerializer) + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig)) val baos = new ByteArrayOutputStream val oos = new ObjectOutputStream(baos) http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala index 4f8816f..157aa0d 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala @@ -22,6 +22,7 @@ import java.io.File import java.util.Random import java.io.BufferedWriter import java.io.FileWriter +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.scala._ import java.io.BufferedReader import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync @@ -81,8 +82,12 @@ class MassiveCaseClassSortingITCase { val typeInfo = implicitly[TypeInformation[StringTuple]] .asInstanceOf[CompositeType[StringTuple]] - val serializer = typeInfo.createSerializer() - val comparator = typeInfo.createComparator(Array(0, 1), Array(true, true), 0) + val serializer = typeInfo.createSerializer(new ExecutionConfig) + val comparator = typeInfo.createComparator( + Array(0, 1), + Array(true, true), + 0, + new ExecutionConfig) val mm = new DefaultMemoryManager(1024 * 1024, 1) val ioMan = new IOManagerAsync() http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala index 4718395..21c6581 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.junit.Test import org.junit.Assert._ import org.apache.flink.api.scala._ @@ -48,9 +49,13 @@ class CaseClassComparatorTest { val typeInfo = implicitly[TypeInformation[CaseTestClass]] .asInstanceOf[CompositeType[CaseTestClass]] - val serializer = typeInfo.createSerializer() + val serializer = typeInfo.createSerializer(new ExecutionConfig) val comparator = new FailingCompareDeserializedWrapper( - typeInfo.createComparator(Array[Int](0, 2), Array[Boolean](true, true), 0)) + typeInfo.createComparator( + Array[Int](0, 2), + Array[Boolean](true, true), + 0, + new ExecutionConfig)) assertTrue(comparator.supportsNormalizedKey()) assertEquals(8, comparator.getNormalizeKeyLen()) http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala index c396f9f..ce4efb3 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -17,13 +17,15 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.SerializerTestInstance +import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.typeutils.GenericTypeInfo +import org.apache.flink.api.java.typeutils.runtime.KryoSerializer import org.joda.time.DateTime import org.junit.Test import scala.reflect._ import org.joda.time.LocalDate -import org.apache.flink.api.java.typeutils.runtime.KryoSerializer import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Output @@ -94,8 +96,6 @@ class KryoGenericTypeSerializerTest { def jodaSerialization: Unit = { val a = List(new LocalDate(1), new LocalDate(2)) - KryoSerializer.registerSerializer(classOf[LocalDate], new LocalDateSerializer()) - runTests(a) } @@ -191,8 +191,13 @@ class KryoGenericTypeSerializerTest { def runTests[T : ClassTag](objects: Seq[T]): Unit ={ val clsTag = classTag[T] + + + // Register the custom Kryo Serializer + val conf = new ExecutionConfig + conf.registerKryoSerializer(classOf[LocalDate], classOf[LocalDateSerializer]) val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]]) - val serializer = typeInfo.createSerializer() + val serializer = typeInfo.createSerializer(conf) val typeClass = typeInfo.getTypeClass val instance = new SerializerTestInstance[T](serializer, typeClass, -1, objects: _*) http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala index fc51c0c..c86fde0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.typeutils.runtime.KryoSerializer import org.junit.Assert._ @@ -90,7 +91,7 @@ class ScalaSpecialTypesSerializerTest { private final def runTests[T : TypeInformation](instances: Array[T]) { try { val typeInfo = implicitly[TypeInformation[T]] - val serializer = typeInfo.createSerializer + val serializer = typeInfo.createSerializer(new ExecutionConfig) val typeClass = typeInfo.getTypeClass val test = new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, -1, instances) @@ -116,8 +117,9 @@ class ScalaSpecialTypesSerializerTestInstance[T]( override def testInstantiate(): Unit = { try { val serializer: TypeSerializer[T] = getSerializer - val instance: T = serializer.createInstance if (!serializer.isInstanceOf[KryoSerializer[_]]) { + // kryo serializer does return null, so only test for non-kryo-serializers + val instance: T = serializer.createInstance assertNotNull("The created instance must not be null.", instance) } val tpe: Class[T] = getTypeClass http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala index 84ff4a6..65648b6 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.InvalidTypesException import org.junit.Assert._ @@ -104,7 +105,7 @@ class TraversableSerializerTest { val testData = Array(Array((1, "String"), (2, "Foo")), Array((4, "String"), (3, "Foo"))) runTests(testData) } -// + @Test def testWithCaseClass(): Unit = { val testData = Array(Seq((1, "String"), (2, "Foo")), Seq((4, "String"), (3, "Foo"))) @@ -132,7 +133,7 @@ class TraversableSerializerTest { private final def runTests[T : TypeInformation](instances: Array[T]) { try { val typeInfo = implicitly[TypeInformation[T]] - val serializer = typeInfo.createSerializer + val serializer = typeInfo.createSerializer(new ExecutionConfig) val typeClass = typeInfo.getTypeClass val test = new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, -1, instances) http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala index 8ccbc83..8b1a180 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala @@ -17,8 +17,8 @@ */ package org.apache.flink.api.scala.runtime -import org.apache.flink.api.common.typeutils.TypeComparator -import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase @@ -30,12 +30,12 @@ class TupleComparatorILD2Test extends TupleComparatorTestBase[(Int, Long, Double protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]] - .createComparator(Array(0, 1), Array(ascending, ascending), 0) + .createComparator(Array(0, 1), Array(ascending, ascending), 0, new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, Long, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala index d329571..2fdc087 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala._ @@ -28,12 +29,16 @@ class TupleComparatorILD3Test extends TupleComparatorTestBase[(Int, Long, Double protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]] - .createComparator(Array(0, 1, 2), Array(ascending, ascending, ascending), 0) + .createComparator( + Array(0, 1, 2), + Array(ascending, ascending, ascending), + 0, + new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, Long, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala index de7affd..34e0306 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala._ @@ -28,12 +29,16 @@ class TupleComparatorILDC3Test extends TupleComparatorTestBase[(Int, Long, Doubl protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]] - .createComparator(Array(2, 0, 1), Array(ascending, ascending, ascending), 0) + .createComparator( + Array(2, 0, 1), + Array(ascending, ascending, ascending), + 0, + new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, Long, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala index eebd56f..27d8296 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase @@ -28,12 +29,12 @@ class TupleComparatorILDX1Test extends TupleComparatorTestBase[(Int, Long, Doubl protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]] - .createComparator(Array(1), Array(ascending), 0) + .createComparator(Array(1), Array(ascending), 0, new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, Long, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala index c83ab8b..8231d46 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase @@ -28,12 +29,12 @@ class TupleComparatorILDXC2Test extends TupleComparatorTestBase[(Int, Long, Doub protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]] - .createComparator(Array(2, 1), Array(ascending, ascending), 0) + .createComparator(Array(2, 1), Array(ascending, ascending), 0, new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, Long, Double)] = { val ti = createTypeInformation[(Int, Long, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, Long, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala index 03c9666..1e1399e 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala._ @@ -27,12 +28,12 @@ class TupleComparatorISD1Test extends TupleComparatorTestBase[(Int, String, Doub protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = { val ti = createTypeInformation[(Int, String, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]] - .createComparator(Array(0), Array(ascending),0) + .createComparator(Array(0), Array(ascending), 0, new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, String, Double)] = { val ti = createTypeInformation[(Int, String, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, String, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala index 9a3b9f9..eb905bd 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala._ @@ -27,12 +28,12 @@ class TupleComparatorISD2Test extends TupleComparatorTestBase[(Int, String, Doub protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = { val ti = createTypeInformation[(Int, String, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]] - .createComparator(Array(0, 1), Array(ascending, ascending), 0) + .createComparator(Array(0, 1), Array(ascending, ascending), 0, new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, String, Double)] = { val ti = createTypeInformation[(Int, String, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, String, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala index 01b4f3e..d7ff16a 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.runtime +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator} import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala._ @@ -27,12 +28,16 @@ class TupleComparatorISD3Test extends TupleComparatorTestBase[(Int, String, Doub protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = { val ti = createTypeInformation[(Int, String, Double)] ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]] - .createComparator(Array(0, 1, 2), Array(ascending, ascending, ascending), 0) + .createComparator( + Array(0, 1, 2), + Array(ascending, ascending, ascending), + 0, + new ExecutionConfig) } protected def createSerializer: TypeSerializer[(Int, String, Double)] = { val ti = createTypeInformation[(Int, String, Double)] - ti.createSerializer() + ti.createSerializer(new ExecutionConfig) } protected def getSortedTestData: Array[(Int, String, Double)] = { http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala index 29e13ec..9371604 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala @@ -18,6 +18,8 @@ package org.apache.flink.api.scala.runtime import java.util +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._ import org.apache.flink.api.common.typeinfo.TypeInformation @@ -28,7 +30,6 @@ import org.junit.Test import org.apache.flink.api.scala._ import scala.collection.JavaConverters._ import java.util.Random -import org.apache.flink.api.java.typeutils.runtime.KryoSerializer class TupleSerializerTest { @@ -102,8 +103,6 @@ class TupleSerializerTest { (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt))) - KryoSerializer.registerSerializer(classOf[LocalDate], new LocalDateSerializer()) - runTests(testTuples) } @@ -192,8 +191,11 @@ class TupleSerializerTest { private final def runTests[T <: Product : TypeInformation](instances: Array[T]) { try { + // Register the custom Kryo Serializer + val conf = new ExecutionConfig + conf.registerKryoSerializer(classOf[LocalDate], classOf[LocalDateSerializer]) val tupleTypeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]] - val serializer = tupleTypeInfo.createSerializer + val serializer = tupleTypeInfo.createSerializer(conf) val tupleClass = tupleTypeInfo.getTypeClass val test = new TupleSerializerTestInstance[T](serializer, tupleClass, -1, instances) test.testAll() http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala index 89d7c5e..08ba49d 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.scala.types import java.io.DataInput import java.io.DataOutput import org.apache.flink.api.common.typeinfo._ +import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.types.{IntValue, StringValue}