[FLINK-377] Generic Interface
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af9248c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af9248c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af9248c3 Branch: refs/heads/master Commit: af9248c35a5a138d311073b54f6abd4260ab7fd9 Parents: 877043f Author: zentol <s.mo...@web.de> Authored: Tue Mar 3 20:50:41 2015 +0100 Committer: zentol <s.mo...@web.de> Committed: Tue Apr 21 13:37:29 2015 +0200 ---------------------------------------------------------------------- .../operators/base/CoGroupRawOperatorBase.java | 270 ++++++++ flink-dist/pom.xml | 5 + .../api/java/operators/CoGroupRawOperator.java | 118 ++++ .../org/apache/flink/api/java/tuple/Tuple0.java | 39 ++ .../flink/optimizer/costs/CostEstimator.java | 1 + .../flink/optimizer/dag/CoGroupRawNode.java | 82 +++ .../operators/CoGroupRawDescriptor.java | 171 +++++ .../traversals/GraphCreatingVisitor.java | 5 + .../runtime/operators/CoGroupRawDriver.java | 150 +++++ .../flink/runtime/operators/DriverStrategy.java | 3 + .../flink-language-binding-generic/pom.xml | 61 ++ .../api/java/common/OperationInfo.java | 57 ++ .../api/java/common/PlanBinder.java | 656 +++++++++++++++++++ .../api/java/common/streaming/Receiver.java | 410 ++++++++++++ .../api/java/common/streaming/Sender.java | 415 ++++++++++++ .../java/common/streaming/StreamPrinter.java | 55 ++ .../api/java/common/streaming/Streamer.java | 276 ++++++++ flink-staging/flink-language-binding/pom.xml | 39 ++ flink-staging/pom.xml | 1 + 19 files changed, 2814 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java new file mode 100644 index 0000000..2c81e02 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.base; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.DualInputOperator; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; + +/** + * @see org.apache.flink.api.common.functions.CoGroupFunction + */ +public class CoGroupRawOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> { + + /** + * The ordering for the order inside a group from input one. + */ + private Ordering groupOrder1; + + /** + * The ordering for the order inside a group from input two. + */ + private Ordering groupOrder2; + + // -------------------------------------------------------------------------------------------- + private boolean combinableFirst; + + private boolean combinableSecond; + + public CoGroupRawOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) { + super(udf, operatorInfo, keyPositions1, keyPositions2, name); + this.combinableFirst = false; + this.combinableSecond = false; + } + + public CoGroupRawOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) { + this(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name); + } + + public CoGroupRawOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) { + this(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name); + } + + // -------------------------------------------------------------------------------------------- + /** + * Sets the order of the elements within a group for the given input. + * + * @param inputNum The number of the input (here either <i>0</i> or <i>1</i>). + * @param order The order for the elements in a group. + */ + public void setGroupOrder(int inputNum, Ordering order) { + if (inputNum == 0) { + this.groupOrder1 = order; + } else if (inputNum == 1) { + this.groupOrder2 = order; + } else { + throw new IndexOutOfBoundsException(); + } + } + + /** + * Sets the order of the elements within a group for the first input. + * + * @param order The order for the elements in a group. + */ + public void setGroupOrderForInputOne(Ordering order) { + setGroupOrder(0, order); + } + + /** + * Sets the order of the elements within a group for the second input. + * + * @param order The order for the elements in a group. + */ + public void setGroupOrderForInputTwo(Ordering order) { + setGroupOrder(1, order); + } + + /** + * Gets the value order for an input, i.e. the order of elements within a group. + * If no such order has been set, this method returns null. + * + * @param inputNum The number of the input (here either <i>0</i> or <i>1</i>). + * @return The group order. + */ + public Ordering getGroupOrder(int inputNum) { + if (inputNum == 0) { + return this.groupOrder1; + } else if (inputNum == 1) { + return this.groupOrder2; + } else { + throw new IndexOutOfBoundsException(); + } + } + + /** + * Gets the order of elements within a group for the first input. + * If no such order has been set, this method returns null. + * + * @return The group order for the first input. + */ + public Ordering getGroupOrderForInputOne() { + return getGroupOrder(0); + } + + /** + * Gets the order of elements within a group for the second input. + * If no such order has been set, this method returns null. + * + * @return The group order for the second input. + */ + public Ordering getGroupOrderForInputTwo() { + return getGroupOrder(1); + } + + // -------------------------------------------------------------------------------------------- + public boolean isCombinableFirst() { + return this.combinableFirst; + } + + public void setCombinableFirst(boolean combinableFirst) { + this.combinableFirst = combinableFirst; + } + + public boolean isCombinableSecond() { + return this.combinableSecond; + } + + public void setCombinableSecond(boolean combinableSecond) { + this.combinableSecond = combinableSecond; + } + + // ------------------------------------------------------------------------ + @Override + protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { + // -------------------------------------------------------------------- + // Setup + // -------------------------------------------------------------------- + TypeInformation<IN1> inputType1 = getOperatorInfo().getFirstInputType(); + TypeInformation<IN2> inputType2 = getOperatorInfo().getSecondInputType(); + + int[] inputKeys1 = getKeyColumns(0); + int[] inputKeys2 = getKeyColumns(1); + + boolean[] inputSortDirections1 = new boolean[inputKeys1.length]; + boolean[] inputSortDirections2 = new boolean[inputKeys2.length]; + + Arrays.fill(inputSortDirections1, true); + Arrays.fill(inputSortDirections2, true); + + final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer(executionConfig); + final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer(executionConfig); + + final TypeComparator<IN1> inputComparator1 = getTypeComparator(executionConfig, inputType1, inputKeys1, inputSortDirections1); + final TypeComparator<IN2> inputComparator2 = getTypeComparator(executionConfig, inputType2, inputKeys2, inputSortDirections2); + + SimpleListIterable<IN1> iterator1 = new SimpleListIterable<IN1>(input1, inputComparator1, inputSerializer1); + SimpleListIterable<IN2> iterator2 = new SimpleListIterable<IN2>(input2, inputComparator2, inputSerializer2); + + // -------------------------------------------------------------------- + // Run UDF + // -------------------------------------------------------------------- + CoGroupFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject(); + + FunctionUtils.setFunctionRuntimeContext(function, ctx); + FunctionUtils.openFunction(function, parameters); + + List<OUT> result = new ArrayList<OUT>(); + Collector<OUT> resultCollector = new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer(executionConfig)); + + function.coGroup(iterator1, iterator2, resultCollector); + + FunctionUtils.closeFunction(function); + + return result; + } + + private <T> TypeComparator<T> getTypeComparator(ExecutionConfig executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) { + if (!(inputType instanceof CompositeType)) { + throw new InvalidProgramException("Input types of coGroup must be composite types."); + } + + return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig); + } + + public static class SimpleListIterable<IN> implements Iterable<IN> { + private List<IN> values; + private TypeSerializer<IN> serializer; + private boolean copy; + + public SimpleListIterable(List<IN> values, final TypeComparator<IN> comparator, TypeSerializer<IN> serializer) throws IOException { + this.values = values; + this.serializer = serializer; + + Collections.sort(values, new Comparator<IN>() { + @Override + public int compare(IN o1, IN o2) { + return comparator.compare(o1, o2); + } + }); + } + + @Override + public Iterator<IN> iterator() { + return new SimpleListIterator<IN>(values, serializer); + } + + protected class SimpleListIterator<IN> implements Iterator<IN> { + private final List<IN> values; + private final TypeSerializer<IN> serializer; + private int pos = 0; + + public SimpleListIterator(List<IN> values, TypeSerializer<IN> serializer) { + this.values = values; + this.serializer = serializer; + } + + @Override + public boolean hasNext() { + return pos < values.size(); + } + + @Override + public IN next() { + IN current = values.get(pos++); + return serializer.copy(current); + } + + @Override + public void remove() { //unused + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index b826c45..8f20648 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -107,6 +107,11 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-language-binding-generic</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <!-- See main pom.xml for explanation of profiles --> http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java new file mode 100644 index 0000000..38326bd --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.operators; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; + +/** + * A {@link DataSet} that is the result of a CoGroup transformation. + * + * @param <I1> The type of the first input DataSet of the CoGroup transformation. + * @param <I2> The type of the second input DataSet of the CoGroup transformation. + * @param <OUT> The type of the result of the CoGroup transformation. + * + * @see DataSet + */ +public class CoGroupRawOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, CoGroupRawOperator<I1, I2, OUT>> { + + private final CoGroupFunction<I1, I2, OUT> function; + + private final Keys<I1> keys1; + private final Keys<I2> keys2; + + private final String defaultName; + + public CoGroupRawOperator(DataSet<I1> input1, DataSet<I2> input2, + Keys<I1> keys1, Keys<I2> keys2, + CoGroupFunction<I1, I2, OUT> function, + TypeInformation<OUT> returnType, + String defaultName) { + super(input1, input2, returnType); + this.function = function; + this.defaultName = defaultName; + this.name = defaultName; + + if (keys1 == null || keys2 == null) { + throw new NullPointerException(); + } + + this.keys1 = keys1; + this.keys2 = keys2; + + extractSemanticAnnotationsFromUdf(function.getClass()); + } + + protected Keys<I1> getKeys1() { + return this.keys1; + } + + protected Keys<I2> getKeys2() { + return this.keys2; + } + + @Override + protected org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + String name = getName() != null ? getName() : "CoGroup at " + defaultName; + try { + keys1.areCompatible(keys2); + } catch (IncompatibleKeysException e) { + throw new InvalidProgramException("The types of the key fields do not match.", e); + } + + if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + try { + keys1.areCompatible(keys2); + } catch (IncompatibleKeysException e) { + throw new InvalidProgramException("The types of the key fields do not match.", e); + } + + int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions(); + int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions(); + + CoGroupRawOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>> po + = new CoGroupRawOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>>( + function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), + getResultType()), logicalKeyPositions1, logicalKeyPositions2, name); + + // set inputs + po.setFirstInput(input1); + po.setSecondInput(input2); + + // set dop + po.setDegreeOfParallelism(this.getParallelism()); + + return po; + + } else { + throw new UnsupportedOperationException("Unrecognized or incompatible key types."); + } + } + + @Override + protected Function getFunction() { + return function; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java new file mode 100644 index 0000000..82209bf --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.tuple; + +public class Tuple0 extends Tuple { + + private static final long serialVersionUID = 1L; + + public Tuple0() {} + + @Override + public <T> T getField(int pos) { + return null; + } + + @Override + public <T> void setField(T value, int pos) { + } + + @Override + public int getArity() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java index 7880734..3a02735 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java @@ -180,6 +180,7 @@ public abstract class CostEstimator { // this operations does not do any actual grouping, since every element is in the same single group case CO_GROUP: + case CO_GROUP_RAW: case SORTED_GROUP_REDUCE: case SORTED_REDUCE: // grouping or co-grouping over sorted streams for free http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java new file mode 100644 index 0000000..971d244 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.CoGroupRawDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; + +/** + * The Optimizer representation of a <i>CoGroupRaw</i> operator. + */ +public class CoGroupRawNode extends TwoInputNode { + private List<OperatorDescriptorDual> dataProperties; + + public CoGroupRawNode(CoGroupRawOperatorBase<?, ?, ?, ?> pactContract) { + super(pactContract); + this.dataProperties = initializeDataProperties(); + } + + // -------------------------------------------------------------------------------------------- + /** + * Gets the operator for this CoGroup node. + * + * @return The CoGroup operator. + */ + @Override + public CoGroupRawOperatorBase<?, ?, ?, ?> getOperator() { + return (CoGroupRawOperatorBase<?, ?, ?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "CoGroup"; + } + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return this.dataProperties; + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // for CoGroup, we currently make no reasonable default estimates + } + + private List<OperatorDescriptorDual> initializeDataProperties() { + Ordering groupOrder1 = null; + Ordering groupOrder2 = null; + + CoGroupRawOperatorBase<?, ?, ?, ?> cgc = getOperator(); + groupOrder1 = cgc.getGroupOrderForInputOne(); + groupOrder2 = cgc.getGroupOrderForInputTwo(); + + if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 0) { + groupOrder1 = null; + } + if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 0) { + groupOrder2 = null; + } + + return Collections.<OperatorDescriptorDual>singletonList(new CoGroupRawDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java new file mode 100644 index 0000000..61561a4 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.util.Utils; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * + */ +public class CoGroupRawDescriptor extends OperatorDescriptorDual { + + private final Ordering ordering1; // ordering on the first input + private final Ordering ordering2; // ordering on the second input + + public CoGroupRawDescriptor(FieldList keys1, FieldList keys2) { + this(keys1, keys2, null, null); + } + + public CoGroupRawDescriptor(FieldList keys1, FieldList keys2, Ordering additionalOrdering1, Ordering additionalOrdering2) { + super(keys1, keys2); + + // if we have an additional ordering, construct the ordering to have primarily the grouping fields + if (additionalOrdering1 != null) { + this.ordering1 = new Ordering(); + for (Integer key : this.keys1) { + this.ordering1.appendOrdering(key, null, Order.ANY); + } + + // and next the additional order fields + for (int i = 0; i < additionalOrdering1.getNumberOfFields(); i++) { + Integer field = additionalOrdering1.getFieldNumber(i); + Order order = additionalOrdering1.getOrder(i); + this.ordering1.appendOrdering(field, additionalOrdering1.getType(i), order); + } + } else { + this.ordering1 = Utils.createOrdering(this.keys1); + } + + // if we have an additional ordering, construct the ordering to have primarily the grouping fields + if (additionalOrdering2 != null) { + this.ordering2 = new Ordering(); + for (Integer key : this.keys2) { + this.ordering2.appendOrdering(key, null, Order.ANY); + } + + // and next the additional order fields + for (int i = 0; i < additionalOrdering2.getNumberOfFields(); i++) { + Integer field = additionalOrdering2.getFieldNumber(i); + Order order = additionalOrdering2.getOrder(i); + this.ordering2.appendOrdering(field, additionalOrdering2.getType(i), order); + } + } else { + this.ordering2 = Utils.createOrdering(this.keys2); + } + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.CO_GROUP_RAW; + } + + @Override + protected List<OperatorDescriptorDual.GlobalPropertiesPair> createPossibleGlobalProperties() { + RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties(); + partitioned1.setHashPartitioned(this.keys1); + RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties(); + partitioned2.setHashPartitioned(this.keys2); + return Collections.singletonList(new OperatorDescriptorDual.GlobalPropertiesPair(partitioned1, partitioned2)); + } + + @Override + protected List<OperatorDescriptorDual.LocalPropertiesPair> createPossibleLocalProperties() { + RequestedLocalProperties sort1 = new RequestedLocalProperties(this.ordering1); + RequestedLocalProperties sort2 = new RequestedLocalProperties(this.ordering2); + return Collections.singletonList(new OperatorDescriptorDual.LocalPropertiesPair(sort1, sort2)); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + int numRelevantFields = this.keys1.size(); + + Ordering prod1 = produced1.getOrdering(); + Ordering prod2 = produced2.getOrdering(); + + if (prod1 == null || prod2 == null || prod1.getNumberOfFields() < numRelevantFields + || prod2.getNumberOfFields() < prod2.getNumberOfFields()) { + throw new CompilerException("The given properties do not meet this operators requirements."); + } + + for (int i = 0; i < numRelevantFields; i++) { + if (prod1.getOrder(i) != prod2.getOrder(i)) { + return false; + } + } + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + boolean[] inputOrders = in1.getLocalProperties().getOrdering() == null ? null : in1.getLocalProperties().getOrdering().getFieldSortDirections(); + + if (inputOrders == null || inputOrders.length < this.keys1.size()) { + throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator."); + } else if (inputOrders.length > this.keys1.size()) { + boolean[] tmp = new boolean[this.keys1.size()]; + System.arraycopy(inputOrders, 0, tmp, 0, tmp.length); + inputOrders = tmp; + } + + return new DualInputPlanNode(node, "CoGroup (" + node.getOperator().getName() + ")", in1, in2, + DriverStrategy.CO_GROUP_RAW, this.keys1, this.keys2, inputOrders); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { + GlobalProperties gp = GlobalProperties.combine(in1, in2); + if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 + && gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { + gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList()); + } + gp.clearUniqueFieldCombinations(); + return gp; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + LocalProperties comb = LocalProperties.combine(in1, in2); + return comb.clearUniqueFieldSets(); + } + + @Override + public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, + GlobalProperties produced1, GlobalProperties produced2) + { + return produced1.getPartitioning() == produced2.getPartitioning() && + (produced1.getCustomPartitioner() == null ? + produced2.getCustomPartitioner() == null : + produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner())); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java index 37cffce..7fbdf81 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java @@ -69,6 +69,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase; +import org.apache.flink.optimizer.dag.CoGroupRawNode; /** * This traversal creates the optimizer DAG from a program. @@ -164,6 +166,9 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> { else if (c instanceof CoGroupOperatorBase) { n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c); } + else if (c instanceof CoGroupRawOperatorBase) { + n = new CoGroupRawNode((CoGroupRawOperatorBase<?, ?, ?, ?>) c); + } else if (c instanceof CrossOperatorBase) { n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c); } http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java new file mode 100644 index 0000000..7abad5d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CoGroupRawDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> { + + private static final Logger LOG = LoggerFactory.getLogger(CoGroupRawDriver.class); + + private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext; + + private SimpleIterable<IT1> coGroupIterator1; + private SimpleIterable<IT2> coGroupIterator2; + + @Override + public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) { + this.taskContext = context; + } + + @Override + public int getNumberOfInputs() { + return 2; + } + + @Override + public int getNumberOfDriverComparators() { + return 0; + } + + @Override + public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() { + @SuppressWarnings("unchecked") + final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class; + return clazz; + } + + @Override + public void prepare() throws Exception { + final TaskConfig config = this.taskContext.getTaskConfig(); + if (config.getDriverStrategy() != DriverStrategy.CO_GROUP_RAW) { + throw new Exception("Unrecognized driver strategy for CoGoup Python driver: " + config.getDriverStrategy().name()); + } + + final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0); + final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1); + + IT1 reuse1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer().createInstance(); + IT2 reuse2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer().createInstance(); + + this.coGroupIterator1 = new SimpleIterable<IT1>(reuse1, in1); + this.coGroupIterator2 = new SimpleIterable<IT2>(reuse2, in2); + + if (LOG.isDebugEnabled()) { + LOG.debug(this.taskContext.formatLogString("CoGroup task iterator ready.")); + } + } + + @Override + public void run() throws Exception { + final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub(); + final Collector<OT> collector = this.taskContext.getOutputCollector(); + final SimpleIterable<IT1> i1 = this.coGroupIterator1; + final SimpleIterable<IT2> i2 = this.coGroupIterator2; + + coGroupStub.coGroup(i1, i2, collector); + } + + @Override + public void cleanup() throws Exception { + } + + @Override + public void cancel() throws Exception { + cleanup(); + } + + public static class SimpleIterable<IN> implements Iterable<IN> { + private IN reuse; + private final MutableObjectIterator<IN> iterator; + + public SimpleIterable(IN reuse, MutableObjectIterator<IN> iterator) throws IOException { + this.iterator = iterator; + this.reuse = reuse; + } + + @Override + public Iterator<IN> iterator() { + return new SimpleIterator<IN>(reuse, iterator); + } + + protected class SimpleIterator<IN> implements Iterator<IN> { + private IN reuse; + private final MutableObjectIterator<IN> iterator; + private boolean consumed = true; + + public SimpleIterator(IN reuse, MutableObjectIterator<IN> iterator) { + this.iterator = iterator; + this.reuse = reuse; + } + + @Override + public boolean hasNext() { + try { + if (!consumed) { + return true; + } + IN result = iterator.next(reuse); + consumed = result == null; + return !consumed; + } catch (IOException ioex) { + throw new RuntimeException("An error occurred while reading the next record: " + + ioex.getMessage(), ioex); + } + } + + @Override + public IN next() { + consumed = true; + return reuse; + } + + @Override + public void remove() { //unused + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java index d5b131e..7942b3b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java @@ -75,6 +75,9 @@ public enum DriverStrategy { // co-grouping inputs CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, 2), + // python-cogroup + CO_GROUP_RAW(CoGroupRawDriver.class, null, PIPELINED, PIPELINED, 0), + // the first input is build side, the second side is probe side of a hybrid hash table HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, 2), http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml new file mode 100644 index 0000000..a37f82a --- /dev/null +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml @@ -0,0 +1,61 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-language-binding-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-language-binding-generic</artifactId> + <name>flink-language-binding-generic</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-compiler</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java new file mode 100644 index 0000000..bd059b3 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.languagebinding.api.java.common; + +import java.util.Arrays; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * Container for all generic information related to operations. This class contains the absolute minimum fields that are + * required for all operations. This class should be extended to contain any additional fields required on a + * per-language basis. + */ +public abstract class OperationInfo { + public int parentID; //DataSet that an operation is applied on + public int otherID; //secondary DataSet + public int setID; //ID for new DataSet + public int[] keys1; //join/cogroup keys + public int[] keys2; //join/cogroup keys + public TypeInformation<?> types; //typeinformation about output type + public ProjectionEntry[] projections; //projectFirst/projectSecond + + public class ProjectionEntry { + public ProjectionSide side; + public int[] keys; + + public ProjectionEntry(ProjectionSide side, int[] keys) { + this.side = side; + this.keys = keys; + } + + @Override + public String toString() { + return side + " - " + Arrays.toString(keys); + } + } + + public enum ProjectionSide { + FIRST, + SECOND + } + + public enum DatasizeHint { + NONE, + TINY, + HUGE + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java new file mode 100644 index 0000000..f701ab7 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java @@ -0,0 +1,656 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.languagebinding.api.java.common; + +import java.io.IOException; +import java.util.HashMap; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.operators.AggregateOperator; +import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; +import org.apache.flink.api.java.operators.CrossOperator.ProjectCross; +import org.apache.flink.api.java.operators.Grouping; +import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin; +import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin; +import org.apache.flink.api.java.operators.SortedGrouping; +import org.apache.flink.api.java.operators.UdfOperator; +import org.apache.flink.api.java.operators.UnsortedGrouping; +import org.apache.flink.api.java.tuple.Tuple; +import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; +import org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint; +import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.HUGE; +import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.NONE; +import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.TINY; +import org.apache.flink.languagebinding.api.java.common.OperationInfo.ProjectionEntry; +import org.apache.flink.languagebinding.api.java.common.streaming.Receiver; + +/** + * Generic class to construct a Flink plan based on external data. + * + * @param <INFO> + */ +public abstract class PlanBinder<INFO extends OperationInfo> { + public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT"; + public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_"; + + protected static String FLINK_HDFS_PATH = "hdfs:/tmp"; + public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + "/flink_data"; + + public static boolean DEBUG = false; + + public static void setLocalMode() { + FLINK_HDFS_PATH = System.getProperty("java.io.tmpdir") + "/flink"; + } + + protected HashMap<Integer, Object> sets; + public static ExecutionEnvironment env; + protected Receiver receiver; + + public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64; + + //====Plan========================================================================================================== + protected void receivePlan() throws IOException { + receiveParameters(); + receiveOperations(); + } + + //====Environment=================================================================================================== + /** + * This enum contains the identifiers for all supported environment parameters. + */ + private enum Parameters { + DOP, + MODE, + RETRY, + DEBUG + } + + private void receiveParameters() throws IOException { + Integer parameterCount = (Integer) receiver.getRecord(true); + + for (int x = 0; x < parameterCount; x++) { + Tuple value = (Tuple) receiver.getRecord(true); + switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { + case DOP: + Integer dop = (Integer) value.getField(1); + env.setDegreeOfParallelism(dop); + break; + case MODE: + FLINK_HDFS_PATH = (Boolean) value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink"; + break; + case RETRY: + int retry = (Integer) value.getField(1); + env.setNumberOfExecutionRetries(retry); + break; + case DEBUG: + DEBUG = (Boolean) value.getField(1); + break; + } + } + if (env.getDegreeOfParallelism() < 0) { + env.setDegreeOfParallelism(1); + } + } + + //====Operations==================================================================================================== + /** + * This enum contains the identifiers for all supported non-UDF DataSet operations. + */ + private enum Operation { + SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT, + PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE, + REBALANCE, PARTITION_HASH, + BROADCAST + } + + /** + * This enum contains the identifiers for all supported UDF DataSet operations. + */ + protected enum AbstractOperation { + COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION, + } + + protected void receiveOperations() throws IOException { + Integer operationCount = (Integer) receiver.getRecord(true); + for (int x = 0; x < operationCount; x++) { + String identifier = (String) receiver.getRecord(); + Operation op = null; + AbstractOperation aop = null; + try { + op = Operation.valueOf(identifier.toUpperCase()); + } catch (IllegalArgumentException iae) { + try { + aop = AbstractOperation.valueOf(identifier.toUpperCase()); + } catch (IllegalArgumentException iae2) { + throw new IllegalArgumentException("Invalid operation specified: " + identifier); + } + } + if (op != null) { + switch (op) { + case SOURCE_CSV: + createCsvSource(); + break; + case SOURCE_TEXT: + createTextSource(); + break; + case SOURCE_VALUE: + createValueSource(); + break; + case SOURCE_SEQ: + createSequenceSource(); + break; + case SINK_CSV: + createCsvSink(); + break; + case SINK_TEXT: + createTextSink(); + break; + case SINK_PRINT: + createPrintSink(); + break; + case BROADCAST: + createBroadcastVariable(); + break; + case AGGREGATE: + createAggregationOperation(); + break; + case DISTINCT: + createDistinctOperation(); + break; + case FIRST: + createFirstOperation(); + break; + case PARTITION_HASH: + createHashPartitionOperation(); + break; + case PROJECTION: + createProjectOperation(); + break; + case REBALANCE: + createRebalanceOperation(); + break; + case GROUPBY: + createGroupOperation(); + break; + case SORT: + createSortOperation(); + break; + case UNION: + createUnionOperation(); + break; + } + } + if (aop != null) { + switch (aop) { + case COGROUP: + createCoGroupOperation(createOperationInfo(aop)); + break; + case CROSS: + createCrossOperation(NONE, createOperationInfo(aop)); + break; + case CROSS_H: + createCrossOperation(HUGE, createOperationInfo(aop)); + break; + case CROSS_T: + createCrossOperation(TINY, createOperationInfo(aop)); + break; + case FILTER: + createFilterOperation(createOperationInfo(aop)); + break; + case FLATMAP: + createFlatMapOperation(createOperationInfo(aop)); + break; + case GROUPREDUCE: + createGroupReduceOperation(createOperationInfo(aop)); + break; + case JOIN: + createJoinOperation(NONE, createOperationInfo(aop)); + break; + case JOIN_H: + createJoinOperation(HUGE, createOperationInfo(aop)); + break; + case JOIN_T: + createJoinOperation(TINY, createOperationInfo(aop)); + break; + case MAP: + createMapOperation(createOperationInfo(aop)); + break; + case MAPPARTITION: + createMapPartitionOperation(createOperationInfo(aop)); + break; + case REDUCE: + createReduceOperation(createOperationInfo(aop)); + break; + } + } + } + } + + private void createCsvSource() throws IOException { + int id = (Integer) receiver.getRecord(true); + String path = (String) receiver.getRecord(); + String fieldDelimiter = (String) receiver.getRecord(); + String lineDelimiter = (String) receiver.getRecord(); + Tuple types = (Tuple) receiver.getRecord(); + sets.put(id, env.createInput(new CsvInputFormat(new Path(path), lineDelimiter, fieldDelimiter, getForObject(types)), getForObject(types)).name("CsvSource")); + } + + private void createTextSource() throws IOException { + int id = (Integer) receiver.getRecord(true); + String path = (String) receiver.getRecord(); + sets.put(id, env.readTextFile(path).name("TextSource")); + } + + private void createValueSource() throws IOException { + int id = (Integer) receiver.getRecord(true); + int valueCount = (Integer) receiver.getRecord(true); + Object[] values = new Object[valueCount]; + for (int x = 0; x < valueCount; x++) { + values[x] = receiver.getRecord(); + } + sets.put(id, env.fromElements(values).name("ValueSource")); + } + + private void createSequenceSource() throws IOException { + int id = (Integer) receiver.getRecord(true); + long from = (Long) receiver.getRecord(); + long to = (Long) receiver.getRecord(); + sets.put(id, env.generateSequence(from, to).name("SequenceSource")); + } + + private void createCsvSink() throws IOException { + int parentID = (Integer) receiver.getRecord(true); + String path = (String) receiver.getRecord(); + String fieldDelimiter = (String) receiver.getRecord(); + String lineDelimiter = (String) receiver.getRecord(); + WriteMode writeMode = ((Integer) receiver.getRecord(true)) == 1 + ? WriteMode.OVERWRITE + : WriteMode.NO_OVERWRITE; + DataSet parent = (DataSet) sets.get(parentID); + parent.writeAsCsv(path, lineDelimiter, fieldDelimiter, writeMode).name("CsvSink"); + } + + private void createTextSink() throws IOException { + int parentID = (Integer) receiver.getRecord(true); + String path = (String) receiver.getRecord(); + WriteMode writeMode = ((Integer) receiver.getRecord(true)) == 1 + ? WriteMode.OVERWRITE + : WriteMode.NO_OVERWRITE; + DataSet parent = (DataSet) sets.get(parentID); + parent.writeAsText(path, writeMode).name("TextSink"); + } + + private void createPrintSink() throws IOException { + int parentID = (Integer) receiver.getRecord(true); + DataSet parent = (DataSet) sets.get(parentID); + boolean toError = (Boolean) receiver.getRecord(); + (toError ? parent.printToErr() : parent.print()).name("PrintSink"); + } + + private void createBroadcastVariable() throws IOException { + int parentID = (Integer) receiver.getRecord(true); + int otherID = (Integer) receiver.getRecord(true); + String name = (String) receiver.getRecord(); + UdfOperator op1 = (UdfOperator) sets.get(parentID); + DataSet op2 = (DataSet) sets.get(otherID); + + op1.withBroadcastSet(op2, name); + Configuration c = ((UdfOperator) op1).getParameters(); + + if (c == null) { + c = new Configuration(); + } + + int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0); + c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1); + c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, name); + + op1.withParameters(c); + } + + /** + * This method creates an OperationInfo object based on the operation-identifier passed. + * + * @param operationIdentifier + * @return + * @throws IOException + */ + protected abstract INFO createOperationInfo(AbstractOperation operationIdentifier) throws IOException; + + private void createAggregationOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + int count = (Integer) receiver.getRecord(true); + + int encodedAgg = (Integer) receiver.getRecord(true); + int field = (Integer) receiver.getRecord(true); + + Aggregations agg = null; + switch (encodedAgg) { + case 0: + agg = Aggregations.MAX; + break; + case 1: + agg = Aggregations.MIN; + break; + case 2: + agg = Aggregations.SUM; + break; + } + DataSet op = (DataSet) sets.get(parentID); + AggregateOperator ao = op.aggregate(agg, field); + + for (int x = 1; x < count; x++) { + encodedAgg = (Integer) receiver.getRecord(true); + field = (Integer) receiver.getRecord(true); + switch (encodedAgg) { + case 0: + ao = ao.andMax(field); + break; + case 1: + ao = ao.andMin(field); + break; + case 2: + ao = ao.andSum(field); + break; + } + } + + sets.put(setID, ao.name("Aggregation")); + } + + private void createCoGroupOperation(INFO info) { + DataSet op1 = (DataSet) sets.get(info.parentID); + DataSet op2 = (DataSet) sets.get(info.otherID); + sets.put(info.setID, applyCoGroupOperation(op1, op2, info.keys1, info.keys2, info)); + } + + protected abstract DataSet applyCoGroupOperation(DataSet op1, DataSet op2, int[] firstKeys, int[] secondKeys, INFO info); + + private void createCrossOperation(DatasizeHint mode, INFO info) { + DataSet op1 = (DataSet) sets.get(info.parentID); + DataSet op2 = (DataSet) sets.get(info.otherID); + + if (info.types != null && (info.projections == null || info.projections.length == 0)) { + sets.put(info.setID, applyCrossOperation(op1, op2, mode, info)); + } else { + DefaultCross defaultResult; + switch (mode) { + case NONE: + defaultResult = op1.cross(op2); + break; + case HUGE: + defaultResult = op1.crossWithHuge(op2); + break; + case TINY: + defaultResult = op1.crossWithTiny(op2); + break; + default: + throw new IllegalArgumentException("Invalid Cross mode specified: " + mode); + } + if (info.projections.length == 0) { + sets.put(info.setID, defaultResult.name("DefaultCross")); + } else { + ProjectCross project = null; + for (ProjectionEntry pe : info.projections) { + switch (pe.side) { + case FIRST: + project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys); + break; + case SECOND: + project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys); + break; + } + } + sets.put(info.setID, project.name("ProjectCross")); + } + } + } + + protected abstract DataSet applyCrossOperation(DataSet op1, DataSet op2, DatasizeHint mode, INFO info); + + private void createDistinctOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + Object keysArrayOrTuple = receiver.getRecord(true); + int[] keys; + if (keysArrayOrTuple instanceof Tuple) { + keys = tupleToIntArray((Tuple) keysArrayOrTuple); + } else { + keys = (int[]) keysArrayOrTuple; + } + DataSet op = (DataSet) sets.get(parentID); + sets.put(setID, (keys.length == 0 ? op.distinct() : op.distinct(keys)).name("Distinct")); + } + + private void createFilterOperation(INFO info) { + DataSet op1 = (DataSet) sets.get(info.parentID); + sets.put(info.setID, applyFilterOperation(op1, info)); + } + + protected abstract DataSet applyFilterOperation(DataSet op1, INFO info); + + private void createFlatMapOperation(INFO info) { + DataSet op1 = (DataSet) sets.get(info.parentID); + sets.put(info.setID, applyFlatMapOperation(op1, info)); + } + + protected abstract DataSet applyFlatMapOperation(DataSet op1, INFO info); + + private void createFirstOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + int count = (Integer) receiver.getRecord(true); + DataSet op = (DataSet) sets.get(parentID); + sets.put(setID, op.first(count).name("First")); + } + + private void createGroupOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + Object keysArrayOrTuple = receiver.getRecord(true); + int[] keys; + if (keysArrayOrTuple instanceof Tuple) { + keys = tupleToIntArray((Tuple) keysArrayOrTuple); + } else { + keys = (int[]) keysArrayOrTuple; + } + DataSet op1 = (DataSet) sets.get(parentID); + sets.put(setID, op1.groupBy(keys)); + } + + private void createGroupReduceOperation(INFO info) { + Object op1 = sets.get(info.parentID); + if (op1 instanceof DataSet) { + sets.put(info.setID, applyGroupReduceOperation((DataSet) op1, info)); + return; + } + if (op1 instanceof UnsortedGrouping) { + sets.put(info.setID, applyGroupReduceOperation((UnsortedGrouping) op1, info)); + return; + } + if (op1 instanceof SortedGrouping) { + sets.put(info.setID, applyGroupReduceOperation((SortedGrouping) op1, info)); + } + } + + protected abstract DataSet applyGroupReduceOperation(DataSet op1, INFO info); + + protected abstract DataSet applyGroupReduceOperation(UnsortedGrouping op1, INFO info); + + protected abstract DataSet applyGroupReduceOperation(SortedGrouping op1, INFO info); + + private void createHashPartitionOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + Object keysArrayOrTuple = receiver.getRecord(true); + int[] keys; + if (keysArrayOrTuple instanceof Tuple) { + keys = tupleToIntArray((Tuple) keysArrayOrTuple); + } else { + keys = (int[]) keysArrayOrTuple; + } + DataSet op1 = (DataSet) sets.get(parentID); + sets.put(setID, op1.partitionByHash(keys)); + + } + + private void createJoinOperation(DatasizeHint mode, INFO info) { + DataSet op1 = (DataSet) sets.get(info.parentID); + DataSet op2 = (DataSet) sets.get(info.otherID); + + if (info.types != null && (info.projections == null || info.projections.length == 0)) { + sets.put(info.setID, applyJoinOperation(op1, op2, info.keys1, info.keys2, mode, info)); + } else { + DefaultJoin defaultResult; + switch (mode) { + case NONE: + defaultResult = op1.join(op2).where(info.keys1).equalTo(info.keys2); + break; + case HUGE: + defaultResult = op1.joinWithHuge(op2).where(info.keys1).equalTo(info.keys2); + break; + case TINY: + defaultResult = op1.joinWithTiny(op2).where(info.keys1).equalTo(info.keys2); + break; + default: + throw new IllegalArgumentException("Invalid join mode specified."); + } + if (info.projections.length == 0) { + sets.put(info.setID, defaultResult.name("DefaultJoin")); + } else { + ProjectJoin project = null; + for (ProjectionEntry pe : info.projections) { + switch (pe.side) { + case FIRST: + project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys); + break; + case SECOND: + project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys); + break; + } + } + sets.put(info.setID, project.name("ProjectJoin")); + } + } + } + + protected abstract DataSet applyJoinOperation(DataSet op1, DataSet op2, int[] firstKeys, int[] secondKeys, DatasizeHint mode, INFO info); + + private void createMapOperation(INFO info) { + DataSet op1 = (DataSet) sets.get(info.parentID); + sets.put(info.setID, applyMapOperation(op1, info)); + } + + protected abstract DataSet applyMapOperation(DataSet op1, INFO info); + + private void createMapPartitionOperation(INFO info) { + DataSet op1 = (DataSet) sets.get(info.parentID); + sets.put(info.setID, applyMapPartitionOperation(op1, info)); + } + + protected abstract DataSet applyMapPartitionOperation(DataSet op1, INFO info); + + protected void createProjectOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + Object keysArrayOrTuple = receiver.getRecord(true); + int[] keys; + if (keysArrayOrTuple instanceof Tuple) { + keys = tupleToIntArray((Tuple) keysArrayOrTuple); + } else { + keys = (int[]) keysArrayOrTuple; + } + DataSet op1 = (DataSet) sets.get(parentID); + sets.put(setID, op1.project(keys).name("Projection")); + } + + private void createRebalanceOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + DataSet op = (DataSet) sets.get(parentID); + sets.put(setID, op.rebalance().name("Rebalance")); + } + + private void createReduceOperation(INFO info) { + Object op1 = sets.get(info.parentID); + if (op1 instanceof DataSet) { + sets.put(info.setID, applyReduceOperation((DataSet) op1, info)); + return; + } + if (op1 instanceof UnsortedGrouping) { + sets.put(info.setID, applyReduceOperation((UnsortedGrouping) op1, info)); + } + } + + protected abstract DataSet applyReduceOperation(DataSet op1, INFO info); + + protected abstract DataSet applyReduceOperation(UnsortedGrouping op1, INFO info); + + protected void createSortOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + int field = (Integer) receiver.getRecord(true); + int encodedOrder = (Integer) receiver.getRecord(true); + Order order; + switch (encodedOrder) { + case 0: + order = Order.NONE; + break; + case 1: + order = Order.ASCENDING; + break; + case 2: + order = Order.DESCENDING; + break; + case 3: + order = Order.ANY; + break; + default: + order = Order.NONE; + break; + } + Grouping op1 = (Grouping) sets.get(parentID); + if (op1 instanceof UnsortedGrouping) { + sets.put(setID, ((UnsortedGrouping) op1).sortGroup(field, order)); + return; + } + if (op1 instanceof SortedGrouping) { + sets.put(setID, ((SortedGrouping) op1).sortGroup(field, order)); + } + } + + protected void createUnionOperation() throws IOException { + int setID = (Integer) receiver.getRecord(true); + int parentID = (Integer) receiver.getRecord(true); + int otherID = (Integer) receiver.getRecord(true); + DataSet op1 = (DataSet) sets.get(parentID); + DataSet op2 = (DataSet) sets.get(otherID); + sets.put(setID, op1.union(op2).name("Union")); + } + + //====Utility======================================================================================================= + protected int[] tupleToIntArray(Tuple tuple) { + int[] keys = new int[tuple.getArity()]; + for (int y = 0; y < tuple.getArity(); y++) { + keys[y] = (Integer) tuple.getField(y); + } + return keys; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java new file mode 100644 index 0000000..2741714 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java @@ -0,0 +1,410 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.languagebinding.api.java.common.streaming; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.Serializable; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import org.apache.flink.api.common.functions.AbstractRichFunction; +//CHECKSTYLE.OFF: AvoidStarImport - tuple imports +import org.apache.flink.api.java.tuple.*; +import static org.apache.flink.languagebinding.api.java.common.streaming.Sender.*; +//CHECKSTYLE.ON: AvoidStarImport +import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR; +import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE; +import org.apache.flink.util.Collector; + +/** + * General-purpose class to read data from memory-mapped files. + */ +public class Receiver implements Serializable { + private final AbstractRichFunction function; + + private File inputFile; + private RandomAccessFile inputRAF; + private FileChannel inputChannel; + private MappedByteBuffer fileBuffer; + + private Deserializer deserializer = null; + + public Receiver(AbstractRichFunction function) { + this.function = function; + } + + //=====Setup======================================================================================================== + public void open(String path) throws IOException { + setupMappedFile(path); + } + + private void setupMappedFile(String path) throws FileNotFoundException, IOException { + String inputFilePath = function == null + ? FLINK_TMP_DATA_DIR + "/" + "output" + : path; + + File x = new File(FLINK_TMP_DATA_DIR); + x.mkdirs(); + + inputFile = new File(inputFilePath); + if (inputFile.exists()) { + inputFile.delete(); + } + inputFile.createNewFile(); + inputRAF = new RandomAccessFile(inputFilePath, "rw"); + inputRAF.setLength(MAPPED_FILE_SIZE); + inputRAF.seek(MAPPED_FILE_SIZE - 1); + inputRAF.writeByte(0); + inputRAF.seek(0); + inputChannel = inputRAF.getChannel(); + fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); + } + + public void close() throws IOException { + closeMappedFile(); + } + + private void closeMappedFile() throws IOException { + inputChannel.close(); + inputRAF.close(); + } + + //=====Record-API=================================================================================================== + /** + * Loads a buffer from the memory-mapped file. The records contained within the buffer can be accessed using + * collectRecord(). These records do not necessarily have to be of the same type. This method requires external + * synchronization. + * + * @throws IOException + */ + private void loadBuffer() throws IOException { + int count = 0; + while (fileBuffer.get(0) == 0 && count < 10) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + fileBuffer.load(); + count++; + } + if (fileBuffer.get(0) == 0) { + throw new RuntimeException("External process not respoonding."); + } + fileBuffer.position(1); + } + + /** + * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or + * similar. The PlanBinder requires a method that can return any kind of object. + * + * @return read record + * @throws IOException + */ + public Object getRecord() throws IOException { + return getRecord(false); + } + + /** + * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or + * similar. The PlanBinder requires a method that can return any kind of object. + * + * @param normalized flag indicating whether certain types should be normalized + * @return read record + * @throws IOException + */ + public Object getRecord(boolean normalized) throws IOException { + if (fileBuffer.position() == 0) { + loadBuffer(); + } + return receiveField(normalized); + } + + /** + * Reads a single primitive value or tuple from the buffer. + * + * @return primitive value or tuple + * @throws IOException + */ + private Object receiveField(boolean normalized) throws IOException { + byte type = fileBuffer.get(); + switch (type) { + case TYPE_TUPLE: + int tupleSize = fileBuffer.get(); + Tuple tuple = createTuple(tupleSize); + for (int x = 0; x < tupleSize; x++) { + tuple.setField(receiveField(normalized), x); + } + return tuple; + case TYPE_BOOLEAN: + return fileBuffer.get() == 1; + case TYPE_BYTE: + return fileBuffer.get(); + case TYPE_SHORT: + if (normalized) { + return (int) fileBuffer.getShort(); + } else { + return fileBuffer.getShort(); + } + case TYPE_INTEGER: + return fileBuffer.getInt(); + case TYPE_LONG: + if (normalized) { + return new Long(fileBuffer.getLong()).intValue(); + } else { + return fileBuffer.getLong(); + } + case TYPE_FLOAT: + if (normalized) { + return (double) fileBuffer.getFloat(); + } else { + return fileBuffer.getFloat(); + } + case TYPE_DOUBLE: + return fileBuffer.getDouble(); + case TYPE_STRING: + int stringSize = fileBuffer.getInt(); + byte[] buffer = new byte[stringSize]; + fileBuffer.get(buffer); + return new String(buffer); + case TYPE_BYTES: + int bytessize = fileBuffer.getInt(); + byte[] bytebuffer = new byte[bytessize]; + fileBuffer.get(bytebuffer); + return bytebuffer; + case TYPE_NULL: + return null; + default: + throw new IllegalArgumentException("Unknown TypeID encountered: " + type); + } + } + + //=====Buffered-API================================================================================================= + /** + * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method + * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization. + * The user must guarantee that the buffer was completely written before calling this method. + * + * @param c Collector to collect records + * @param bufferSize size of the buffer + * @throws IOException + */ + public void collectBuffer(Collector c, int bufferSize) throws IOException { + fileBuffer.position(0); + + if (deserializer == null) { + byte type = fileBuffer.get(); + deserializer = getDeserializer(type); + } + while (fileBuffer.position() < bufferSize) { + c.collect(deserializer.deserialize()); + } + } + + //=====Deserializer================================================================================================= + private Deserializer getDeserializer(byte type) { + switch (type) { + case TYPE_TUPLE: + return new TupleDeserializer(); + case TYPE_BOOLEAN: + return new BooleanDeserializer(); + case TYPE_BYTE: + return new ByteDeserializer(); + case TYPE_BYTES: + return new BytesDeserializer(); + case TYPE_SHORT: + return new ShortDeserializer(); + case TYPE_INTEGER: + return new IntDeserializer(); + case TYPE_LONG: + return new LongDeserializer(); + case TYPE_STRING: + return new StringDeserializer(); + case TYPE_FLOAT: + return new FloatDeserializer(); + case TYPE_DOUBLE: + return new DoubleDeserializer(); + case TYPE_NULL: + return new NullDeserializer(); + default: + throw new IllegalArgumentException("Unknown TypeID encountered: " + type); + + } + } + + private interface Deserializer<T> { + public T deserialize(); + + } + + private class BooleanDeserializer implements Deserializer<Boolean> { + @Override + public Boolean deserialize() { + return fileBuffer.get() == 1; + } + } + + private class ByteDeserializer implements Deserializer<Byte> { + @Override + public Byte deserialize() { + return fileBuffer.get(); + } + } + + private class ShortDeserializer implements Deserializer<Short> { + @Override + public Short deserialize() { + return fileBuffer.getShort(); + } + } + + private class IntDeserializer implements Deserializer<Integer> { + @Override + public Integer deserialize() { + return fileBuffer.getInt(); + } + } + + private class LongDeserializer implements Deserializer<Long> { + @Override + public Long deserialize() { + return fileBuffer.getLong(); + } + } + + private class FloatDeserializer implements Deserializer<Float> { + @Override + public Float deserialize() { + return fileBuffer.getFloat(); + } + } + + private class DoubleDeserializer implements Deserializer<Double> { + @Override + public Double deserialize() { + return fileBuffer.getDouble(); + } + } + + private class StringDeserializer implements Deserializer<String> { + private int size; + + @Override + public String deserialize() { + size = fileBuffer.getInt(); + byte[] buffer = new byte[size]; + fileBuffer.get(buffer); + return new String(buffer); + } + } + + private class NullDeserializer implements Deserializer<Object> { + @Override + public Object deserialize() { + return null; + } + } + + private class BytesDeserializer implements Deserializer<byte[]> { + @Override + public byte[] deserialize() { + int length = fileBuffer.getInt(); + byte[] result = new byte[length]; + fileBuffer.get(result); + return result; + } + + } + + private class TupleDeserializer implements Deserializer<Tuple> { + Deserializer[] deserializer = null; + Tuple reuse; + + public TupleDeserializer() { + int size = fileBuffer.getInt(); + reuse = createTuple(size); + deserializer = new Deserializer[size]; + for (int x = 0; x < deserializer.length; x++) { + deserializer[x] = getDeserializer(fileBuffer.get()); + } + } + + @Override + public Tuple deserialize() { + for (int x = 0; x < deserializer.length; x++) { + reuse.setField(deserializer[x].deserialize(), x); + } + return reuse; + } + } + + public static Tuple createTuple(int size) { + switch (size) { + case 0: + return new Tuple0(); + case 1: + return new Tuple1(); + case 2: + return new Tuple2(); + case 3: + return new Tuple3(); + case 4: + return new Tuple4(); + case 5: + return new Tuple5(); + case 6: + return new Tuple6(); + case 7: + return new Tuple7(); + case 8: + return new Tuple8(); + case 9: + return new Tuple9(); + case 10: + return new Tuple10(); + case 11: + return new Tuple11(); + case 12: + return new Tuple12(); + case 13: + return new Tuple13(); + case 14: + return new Tuple14(); + case 15: + return new Tuple15(); + case 16: + return new Tuple16(); + case 17: + return new Tuple17(); + case 18: + return new Tuple18(); + case 19: + return new Tuple19(); + case 20: + return new Tuple20(); + case 21: + return new Tuple21(); + case 22: + return new Tuple22(); + case 23: + return new Tuple23(); + case 24: + return new Tuple24(); + case 25: + return new Tuple25(); + default: + throw new IllegalArgumentException("Tuple size not supported: " + size); + } + } +}