http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java new file mode 100644 index 0000000..89e4642 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.functions.util.RuntimeUDFContext; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparatorFactory; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.Driver; +import org.apache.flink.runtime.operators.TaskContext; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; +import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; +import org.apache.flink.runtime.operators.util.CloseableInputProvider; +import org.apache.flink.runtime.operators.util.LocalStrategy; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.tez.runtime.input.TezReaderIterator; +import org.apache.flink.tez.runtime.output.TezChannelSelector; +import org.apache.flink.tez.runtime.output.TezOutputEmitter; +import org.apache.flink.tez.runtime.output.TezOutputCollector; +import org.apache.flink.tez.util.DummyInvokable; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.MutableObjectIterator; + +import org.apache.tez.runtime.library.api.KeyValueReader; +import org.apache.tez.runtime.library.api.KeyValueWriter; + +import java.io.IOException; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + +public class TezTask<S extends Function,OT> implements TaskContext<S, OT> { + + protected static final Log LOG = LogFactory.getLog(TezTask.class); + + DummyInvokable invokable = new DummyInvokable(); + + /** + * The driver that invokes the user code (the stub implementation). The central driver in this task + * (further drivers may be chained behind this driver). + */ + protected volatile Driver<S, OT> driver; + + /** + * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf. + */ + protected S stub; + + /** + * The udf's runtime context. + */ + protected RuntimeUDFContext runtimeUdfContext; + + /** + * The collector that forwards the user code's results. May forward to a channel or to chained drivers within + * this task. + */ + protected Collector<OT> output; + + /** + * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc... + */ + protected MutableObjectIterator<?>[] inputIterators; + + /** + * The local strategies that are applied on the inputs. + */ + protected volatile CloseableInputProvider<?>[] localStrategies; + + /** + * The inputs to the operator. Return the readers' data after the application of the local strategy + * and the temp-table barrier. + */ + protected MutableObjectIterator<?>[] inputs; + + /** + * The serializers for the input data type. + */ + protected TypeSerializerFactory<?>[] inputSerializers; + + /** + * The comparators for the central driver. + */ + protected TypeComparator<?>[] inputComparators; + + /** + * The task configuration with the setup parameters. + */ + protected TezTaskConfig config; + + /** + * The class loader used to instantiate user code and user data types. + */ + protected ClassLoader userCodeClassLoader = ClassLoader.getSystemClassLoader(); + + /** + * For now, create a default ExecutionConfig + */ + protected ExecutionConfig executionConfig; + + /* + * Tez-specific variables given by the Processor + */ + protected TypeSerializer<OT> outSerializer; + + protected List<Integer> numberOfSubTasksInOutputs; + + protected String taskName; + + protected int numberOfSubtasks; + + protected int indexInSubtaskGroup; + + TezRuntimeEnvironment runtimeEnvironment; + + public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) { + this.config = config; + final Class<? extends Driver<S, OT>> driverClass = this.config.getDriver(); + this.driver = InstantiationUtil.instantiate(driverClass, Driver.class); + + LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs())); + + this.stub = this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class, this.userCodeClassLoader); //TODO get superclass properly + this.runtimeUdfContext = runtimeUdfContext; + this.outSerializer = (TypeSerializer<OT>) this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer(); + this.numberOfSubTasksInOutputs = this.config.getNumberSubtasksInOutput(); + this.taskName = this.config.getTaskName(); + this.numberOfSubtasks = this.runtimeUdfContext.getNumberOfParallelSubtasks(); + this.indexInSubtaskGroup = this.runtimeUdfContext.getIndexOfThisSubtask(); + this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * availableMemory)); + this.executionConfig = runtimeUdfContext.getExecutionConfig(); + this.invokable.setExecutionConfig(this.executionConfig); + } + + + //------------------------------------------------------------- + // Interface to FlinkProcessor + //------------------------------------------------------------- + + public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception { + + // whatever happens in this scope, make sure that the local strategies are cleaned up! + // note that the initialization of the local strategies is in the try-finally block as well, + // so that the thread that creates them catches its own errors that may happen in that process. + // this is especially important, since there may be asynchronous closes (such as through canceling). + try { + // initialize the inputs and outputs + initInputsOutputs(readers, writers); + + // pre main-function initialization + initialize(); + + // the work goes here + run(); + } + finally { + // clean up in any case! + closeLocalStrategies(); + } + } + + + /* + * Initialize inputs, input serializers, input comparators, and collector + * Assumes that the config and userCodeClassLoader has been set + */ + private void initInputsOutputs (List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception { + + int numInputs = readers.size(); + Preconditions.checkArgument(numInputs == driver.getNumberOfInputs()); + + // Prior to local strategies + this.inputIterators = new MutableObjectIterator[numInputs]; + //local strategies + this.localStrategies = new CloseableInputProvider[numInputs]; + // After local strategies + this.inputs = new MutableObjectIterator[numInputs]; + + int numComparators = driver.getNumberOfDriverComparators(); + initInputsSerializersAndComparators(numInputs, numComparators); + + int index = 0; + for (KeyValueReader reader : readers) { + this.inputIterators[index] = new TezReaderIterator<Object>(reader); + initInputLocalStrategy(index); + index++; + } + + int numOutputs = writers.size(); + ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs); + //ArrayList<Integer> numStreamsInOutputs = new ArrayList<Integer>(numOutputs); + for (int i = 0; i < numOutputs; i++) { + final ShipStrategyType strategy = config.getOutputShipStrategy(i); + final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader); + final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader); + if (compFactory == null) { + channelSelectors.add(i, new TezOutputEmitter<OT>(strategy)); + } else if (dataDist == null){ + final TypeComparator<OT> comparator = compFactory.createComparator(); + channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator)); + } else { + final TypeComparator<OT> comparator = compFactory.createComparator(); + channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist)); + } + } + this.output = new TezOutputCollector<OT>(writers, channelSelectors, outSerializer, numberOfSubTasksInOutputs); + } + + + + // -------------------------------------------------------------------- + // TaskContext interface + // -------------------------------------------------------------------- + + @Override + public TaskConfig getTaskConfig() { + return (TaskConfig) this.config; + } + + @Override + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return this.userCodeClassLoader; + } + + @Override + public MemoryManager getMemoryManager() { + return runtimeEnvironment.getMemoryManager(); + } + + @Override + public IOManager getIOManager() { + return runtimeEnvironment.getIOManager(); + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return new TaskManagerRuntimeInfo("localhost", new Configuration()); + } + + @Override + public <X> MutableObjectIterator<X> getInput(int index) { + if (index < 0 || index > this.driver.getNumberOfInputs()) { + throw new IndexOutOfBoundsException(); + } + // check for lazy assignment from input strategies + if (this.inputs[index] != null) { + @SuppressWarnings("unchecked") + MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index]; + return in; + } else { + final MutableObjectIterator<X> in; + try { + if (this.localStrategies[index] != null) { + @SuppressWarnings("unchecked") + MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator(); + in = iter; + } else { + throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy."); + } + this.inputs[index] = in; + return in; + } catch (InterruptedException iex) { + throw new RuntimeException("Interrupted while waiting for input " + index + " to become available."); + } catch (IOException ioex) { + throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + "."); + } + } + } + + @Override + public <X> TypeSerializerFactory<X> getInputSerializer(int index) { + if (index < 0 || index >= this.driver.getNumberOfInputs()) { + throw new IndexOutOfBoundsException(); + } + + @SuppressWarnings("unchecked") + final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index]; + return serializerFactory; + } + + @Override + public <X> TypeComparator<X> getDriverComparator(int index) { + if (this.inputComparators == null) { + throw new IllegalStateException("Comparators have not been created!"); + } + else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) { + throw new IndexOutOfBoundsException(); + } + + @SuppressWarnings("unchecked") + final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index]; + return comparator; + } + + + + @Override + public S getStub() { + return this.stub; + } + + @Override + public Collector<OT> getOutputCollector() { + return this.output; + } + + @Override + public AbstractInvokable getOwningNepheleTask() { + return this.invokable; + } + + @Override + public String formatLogString(String message) { + return null; + } + + + // -------------------------------------------------------------------- + // Adapted from BatchTask + // -------------------------------------------------------------------- + + private void initInputLocalStrategy(int inputNum) throws Exception { + // check if there is already a strategy + if (this.localStrategies[inputNum] != null) { + throw new IllegalStateException(); + } + + // now set up the local strategy + final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum); + if (localStrategy != null) { + switch (localStrategy) { + case NONE: + // the input is as it is + this.inputs[inputNum] = this.inputIterators[inputNum]; + break; + case SORT: + @SuppressWarnings({ "rawtypes", "unchecked" }) + UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(), + this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), + this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), + this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled()); + // set the input to null such that it will be lazily fetched from the input strategy + this.inputs[inputNum] = null; + this.localStrategies[inputNum] = sorter; + break; + case COMBININGSORT: + // sanity check this special case! + // this still breaks a bit of the abstraction! + // we should have nested configurations for the local strategies to solve that + if (inputNum != 0) { + throw new IllegalStateException("Performing combining sort outside a (group)reduce task!"); + } + + // instantiate ourselves a combiner. we should not use the stub, because the sort and the + // subsequent (group)reduce would otherwise share it multi-threaded + final Class<S> userCodeFunctionType = this.driver.getStubType(); + if (userCodeFunctionType == null) { + throw new IllegalStateException("Performing combining sort outside a reduce task!"); + } + final S localStub; + try { + localStub = initStub(userCodeFunctionType); + } catch (Exception e) { + throw new RuntimeException("Initializing the user code and the configuration failed" + + (e.getMessage() == null ? "." : ": " + e.getMessage()), e); + } + + if (!(localStub instanceof GroupCombineFunction)) { + throw new IllegalStateException("Performing combining sort outside a reduce task!"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger( + (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], + this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), + this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), + this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled()); + cSorter.setUdfConfiguration(this.config.getStubParameters()); + + // set the input to null such that it will be lazily fetched from the input strategy + this.inputs[inputNum] = null; + this.localStrategies[inputNum] = cSorter; + break; + default: + throw new Exception("Unrecognized local strategy provided: " + localStrategy.name()); + } + } else { + // no local strategy in the config + this.inputs[inputNum] = this.inputIterators[inputNum]; + } + } + + private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception { + TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader); + if (compFact == null) { + throw new Exception("Missing comparator factory for local strategy on input " + inputNum); + } + return compFact.createComparator(); + } + + protected S initStub(Class<? super S> stubSuperClass) throws Exception { + try { + S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader); + // check if the class is a subclass, if the check is required + if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) { + throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + + stubSuperClass.getName() + "' as is required."); + } + FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext); + return stub; + } + catch (ClassCastException ccex) { + throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex); + } + } + + /** + * Creates all the serializers and comparators. + */ + protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception { + this.inputSerializers = new TypeSerializerFactory<?>[numInputs]; + this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null; + //this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null; + this.inputIterators = new MutableObjectIterator[numInputs]; + + for (int i = 0; i < numInputs; i++) { + // ---------------- create the serializer first --------------------- + final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader); + this.inputSerializers[i] = serializerFactory; + // this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]); + } + // ---------------- create the driver's comparators --------------------- + for (int i = 0; i < numComparators; i++) { + if (this.inputComparators != null) { + final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader); + this.inputComparators[i] = comparatorFactory.createComparator(); + } + } + } + + protected void initialize() throws Exception { + // create the operator + try { + this.driver.setup(this); + } + catch (Throwable t) { + throw new Exception("The driver setup for '" + //TODO put taks name here + "' , caused an error: " + t.getMessage(), t); + } + + //this.runtimeUdfContext = createRuntimeContext(); + + // instantiate the UDF + try { + final Class<? super S> userCodeFunctionType = this.driver.getStubType(); + // if the class is null, the driver has no user code + if (userCodeFunctionType != null) { + this.stub = initStub(userCodeFunctionType); + } + } catch (Exception e) { + throw new RuntimeException("Initializing the UDF" + + e.getMessage() == null ? "." : ": " + e.getMessage(), e); + } + } + + /* + public RuntimeUDFContext createRuntimeContext() { + return new RuntimeUDFContext(this.taskName, this.numberOfSubtasks, this.indexInSubtaskGroup, null); + } + */ + + protected void closeLocalStrategies() { + if (this.localStrategies != null) { + for (int i = 0; i < this.localStrategies.length; i++) { + if (this.localStrategies[i] != null) { + try { + this.localStrategies[i].close(); + } catch (Throwable t) { + LOG.error("Error closing local strategy for input " + i, t); + } + } + } + } + } + + protected void run() throws Exception { + // ---------------------------- Now, the actual processing starts ------------------------ + // check for asynchronous canceling + + boolean stubOpen = false; + + try { + // run the data preparation + try { + this.driver.prepare(); + } + catch (Throwable t) { + // if the preparation caused an error, clean up + // errors during clean-up are swallowed, because we have already a root exception + throw new Exception("The data preparation for task '" + this.taskName + + "' , caused an error: " + t.getMessage(), t); + } + + // open stub implementation + if (this.stub != null) { + try { + Configuration stubConfig = this.config.getStubParameters(); + FunctionUtils.openFunction(this.stub, stubConfig); + stubOpen = true; + } + catch (Throwable t) { + throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t); + } + } + + // run the user code + this.driver.run(); + + // close. We close here such that a regular close throwing an exception marks a task as failed. + if (this.stub != null) { + FunctionUtils.closeFunction(this.stub); + stubOpen = false; + } + + this.output.close(); + + } + catch (Exception ex) { + // close the input, but do not report any exceptions, since we already have another root cause + ex.printStackTrace(); + throw new RuntimeException("Exception in TaskContext: " + ex.getMessage() + " "+ ex.getStackTrace()); + } + finally { + this.driver.cleanup(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java new file mode 100644 index 0000000..94a8315 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; + +public class TezTaskConfig extends TaskConfig { + + public static final String TEZ_TASK_CONFIG = "tez.task.flink.processor.taskconfig"; + + private static final String NUMBER_SUBTASKS_IN_OUTPUTS = "tez.num_subtasks_in_output"; + + private static final String INPUT_SPLIT_PROVIDER = "tez.input_split_provider"; + + private static final String INPUT_POSITIONS = "tez.input_positions"; + + private static final String INPUT_FORMAT = "tez.input_format"; + + private static final String DATASOURCE_PROCESSOR_NAME = "tez.datasource_processor_name"; + + public TezTaskConfig(Configuration config) { + super(config); + } + + + public void setDatasourceProcessorName(String name) { + if (name != null) { + this.config.setString(DATASOURCE_PROCESSOR_NAME, name); + } + } + + public String getDatasourceProcessorName() { + return this.config.getString(DATASOURCE_PROCESSOR_NAME, null); + } + + public void setNumberSubtasksInOutput(ArrayList<Integer> numberSubtasksInOutputs) { + try { + InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, NUMBER_SUBTASKS_IN_OUTPUTS); + } catch (IOException e) { + throw new RuntimeException("Error while writing the input split provider object to the task configuration."); + } + } + + public ArrayList<Integer> getNumberSubtasksInOutput() { + ArrayList<Integer> numberOfSubTasksInOutputs = null; + try { + numberOfSubTasksInOutputs = (ArrayList) InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, getClass().getClassLoader()); + } + catch (IOException e) { + throw new RuntimeException("Error while reading the number of subtasks in outputs object from the task configuration."); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Error while reading the number of subtasks in outpurs object from the task configuration. " + + "class not found."); + } + if (numberOfSubTasksInOutputs == null) { + throw new NullPointerException(); + } + return numberOfSubTasksInOutputs; + + } + + + public void setInputSplitProvider (InputSplitProvider inputSplitProvider) { + try { + InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, INPUT_SPLIT_PROVIDER); + } catch (IOException e) { + throw new RuntimeException("Error while writing the input split provider object to the task configuration."); + } + } + + public InputSplitProvider getInputSplitProvider () { + InputSplitProvider inputSplitProvider = null; + try { + inputSplitProvider = (InputSplitProvider) InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, getClass().getClassLoader()); + } + catch (IOException e) { + throw new RuntimeException("Error while reading the input split provider object from the task configuration."); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Error while reading the input split provider object from the task configuration. " + + "ChannelSelector class not found."); + } + if (inputSplitProvider == null) { + throw new NullPointerException(); + } + return inputSplitProvider; + } + + + public void setInputPositions(HashMap<String,ArrayList<Integer>> inputPositions) { + try { + InstantiationUtil.writeObjectToConfig(inputPositions, this.config, INPUT_POSITIONS); + } catch (IOException e) { + throw new RuntimeException("Error while writing the input positions object to the task configuration."); + } + } + + public HashMap<String,ArrayList<Integer>> getInputPositions () { + HashMap<String,ArrayList<Integer>> inputPositions = null; + try { + inputPositions = (HashMap) InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, getClass().getClassLoader()); + } + catch (IOException e) { + throw new RuntimeException("Error while reading the input positions object from the task configuration."); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Error while reading the input positions object from the task configuration. " + + "ChannelSelector class not found."); + } + if (inputPositions == null) { + throw new NullPointerException(); + } + return inputPositions; + } + + public void setInputFormat (InputFormat inputFormat) { + try { + InstantiationUtil.writeObjectToConfig(inputFormat, this.config, INPUT_FORMAT); + } catch (IOException e) { + throw new RuntimeException("Error while writing the input format object to the task configuration."); + } + } + + public InputFormat getInputFormat () { + InputFormat inputFormat = null; + try { + inputFormat = (InputFormat) InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, getClass().getClassLoader()); + } + catch (IOException e) { + throw new RuntimeException("Error while reading the input split provider object from the task configuration."); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Error while reading the input split provider object from the task configuration. " + + "ChannelSelector class not found."); + } + if (inputFormat == null) { + throw new NullPointerException(); + } + return inputFormat; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java new file mode 100644 index 0000000..7ceeac8 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime; + + +import com.google.common.base.Preconditions; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueReader; +import org.apache.tez.runtime.library.api.KeyValueWriter; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class UnionProcessor extends AbstractLogicalIOProcessor { + + private TezTaskConfig config; + protected Map<String, LogicalInput> inputs; + protected Map<String, LogicalOutput> outputs; + private List<KeyValueReader> readers; + private List<KeyValueWriter> writers; + private int numInputs; + private int numOutputs; + + public UnionProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + UserPayload payload = getContext().getUserPayload(); + Configuration conf = TezUtils.createConfFromUserPayload(payload); + + this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); + config.setTaskName(getContext().getTaskVertexName()); + } + + @Override + public void handleEvents(List<Event> processorEvents) { + + } + + @Override + public void close() throws Exception { + + } + + @Override + public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { + this.inputs = inputs; + this.outputs = outputs; + this.numInputs = inputs.size(); + this.numOutputs = outputs.size(); + + this.readers = new ArrayList<KeyValueReader>(numInputs); + if (this.inputs != null) { + for (LogicalInput input: this.inputs.values()) { + input.start(); + readers.add((KeyValueReader) input.getReader()); + } + } + + this.writers = new ArrayList<KeyValueWriter>(numOutputs); + if (this.outputs != null) { + for (LogicalOutput output : this.outputs.values()) { + output.start(); + writers.add((KeyValueWriter) output.getWriter()); + } + } + + Preconditions.checkArgument(writers.size() == 1); + KeyValueWriter writer = writers.get(0); + + for (KeyValueReader reader: this.readers) { + while (reader.next()) { + Object key = reader.getCurrentKey(); + Object value = reader.getCurrentValue(); + writer.write(key, value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java new file mode 100644 index 0000000..ef59fd0 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime.input; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.util.InstantiationUtil; +import org.apache.tez.runtime.api.AbstractLogicalInput; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + + +public class FlinkInput extends AbstractLogicalInput { + + private static final Log LOG = LogFactory.getLog(FlinkInput.class); + + private InputSplit split; + private boolean splitIsCreated; + private final ReentrantLock rrLock = new ReentrantLock(); + private final Condition rrInited = rrLock.newCondition(); + + public FlinkInput(InputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + getContext().requestInitialMemory(0l, null); // mandatory call + split = null; + } + + @Override + public void handleEvents(List<Event> inputEvents) throws Exception { + + LOG.info("Received " + inputEvents.size() + " events (should be = 1)"); + + Event event = inputEvents.iterator().next(); + + Preconditions.checkArgument(event instanceof InputDataInformationEvent, + getClass().getSimpleName() + + " can only handle a single event of type: " + + InputDataInformationEvent.class.getSimpleName()); + + initSplitFromEvent ((InputDataInformationEvent)event); + } + + private void initSplitFromEvent (InputDataInformationEvent e) throws Exception { + rrLock.lock(); + + try { + ByteString byteString = ByteString.copyFrom(e.getUserPayload()); + this.split = (InputSplit) InstantiationUtil.deserializeObject(byteString.toByteArray(), getClass().getClassLoader()); + this.splitIsCreated = true; + + LOG.info ("Initializing input split " + split.getSplitNumber() + ": " + split.toString() + " from event (" + e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString()); + + rrInited.signal(); + } + catch (Exception ex) { + throw new IOException( + "Interrupted waiting for InputSplit initialization"); + } + finally { + rrLock.unlock(); + } + } + + @Override + public List<Event> close() throws Exception { + return null; + } + + @Override + public void start() throws Exception { + } + + @Override + public Reader getReader() throws Exception { + throw new RuntimeException("FlinkInput does not contain a Reader. Should use getSplit instead"); + } + + @Override + public List<Event> initialize() throws Exception { + return null; + } + + public InputSplit getSplit () throws Exception { + + rrLock.lock(); + try { + if (!splitIsCreated) { + checkAndAwaitSplitInitialization(); + } + } + finally { + rrLock.unlock(); + } + if (split == null) { + LOG.info("Input split has not been created. This should not happen"); + throw new RuntimeException("Input split has not been created. This should not happen"); + } + return split; + } + + void checkAndAwaitSplitInitialization() throws IOException { + assert rrLock.getHoldCount() == 1; + rrLock.lock(); + try { + rrInited.await(); + } catch (Exception e) { + throw new IOException( + "Interrupted waiting for InputSplit initialization"); + } finally { + rrLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java new file mode 100644 index 0000000..db1261c --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime.input; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.tez.runtime.TezTaskConfig; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputInitializerEvent; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + +public class FlinkInputSplitGenerator extends InputInitializer { + + private static final Log LOG = LogFactory.getLog(FlinkInputSplitGenerator.class); + + InputFormat format; + + public FlinkInputSplitGenerator(InputInitializerContext initializerContext) { + super(initializerContext); + } + + @Override + public List<Event> initialize() throws Exception { + + Configuration tezConf = TezUtils.createConfFromUserPayload(this.getContext().getUserPayload()); + + TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); + + this.format = taskConfig.getInputFormat(); + + int numTasks = this.getContext().getNumTasks(); + + LOG.info ("Creating splits for " + numTasks + " tasks for input format " + format); + + InputSplit[] splits = format.createInputSplits((numTasks > 0) ? numTasks : 1 ); + + LOG.info ("Created " + splits.length + " input splits" + " tasks for input format " + format); + + //LOG.info ("Created + " + splits.length + " input splits for input format " + format); + + LOG.info ("Sending input split events"); + LinkedList<Event> events = new LinkedList<Event>(); + for (int i = 0; i < splits.length; i++) { + byte [] bytes = InstantiationUtil.serializeObject(splits[i]); + ByteBuffer buf = ByteBuffer.wrap(bytes); + InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf); + event.setTargetIndex(i % numTasks); + events.add(event); + LOG.info ("Added event of index " + i + ": " + event); + } + return events; + } + + @Override + public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { + + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + //super.onVertexStateUpdated(stateUpdate); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java new file mode 100644 index 0000000..722f0a1 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime.input; + + +import org.apache.flink.util.MutableObjectIterator; +import org.apache.hadoop.io.IntWritable; +import org.apache.tez.runtime.library.api.KeyValueReader; + +import java.io.IOException; + +public class TezReaderIterator<T> implements MutableObjectIterator<T>{ + + private KeyValueReader kvReader; + + public TezReaderIterator(KeyValueReader kvReader) { + this.kvReader = kvReader; + } + + @Override + public T next(T reuse) throws IOException { + if (kvReader.next()) { + Object key = kvReader.getCurrentKey(); + Object value = kvReader.getCurrentValue(); + if (!(key instanceof IntWritable)) { + throw new IllegalStateException("Wrong key type"); + } + reuse = (T) value; + return reuse; + } + else { + return null; + } + } + + @Override + public T next() throws IOException { + if (kvReader.next()) { + Object key = kvReader.getCurrentKey(); + Object value = kvReader.getCurrentValue(); + if (!(key instanceof IntWritable)) { + throw new IllegalStateException("Wrong key type"); + } + return (T) value; + } + else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java new file mode 100644 index 0000000..2358f29 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime.output; + + +import org.apache.hadoop.io.IntWritable; +import org.apache.tez.runtime.library.api.Partitioner; + +public class SimplePartitioner implements Partitioner { + + @Override + public int getPartition(Object key, Object value, int numPartitions) { + if (!(key instanceof IntWritable)) { + throw new IllegalStateException("Partitioning key should be int"); + } + IntWritable channel = (IntWritable) key; + return channel.get(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java new file mode 100644 index 0000000..7e5cd55 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime.output; + +import java.io.Serializable; + +public interface TezChannelSelector<T> extends Serializable { + + /** + * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded. + * + * @param record + * the record to the determine the output channels for + * @param numberOfOutputChannels + * the total number of output channels which are attached to respective output gate + * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through + * which the record shall be forwarded + */ + int[] selectChannels(T record, int numberOfOutputChannels); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java new file mode 100644 index 0000000..b68e6c8 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime.output; + + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; +import org.apache.hadoop.io.IntWritable; +import org.apache.tez.runtime.library.api.KeyValueWriter; + +import java.io.IOException; +import java.util.List; + +public class TezOutputCollector<T> implements Collector<T> { + + private List<KeyValueWriter> writers; + + private List<TezChannelSelector<T>> outputEmitters; + + private List<Integer> numberOfStreamsInOutputs; + + private int numOutputs; + + private TypeSerializer<T> serializer; + + public TezOutputCollector(List<KeyValueWriter> writers, List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, List<Integer> numberOfStreamsInOutputs) { + this.writers = writers; + this.outputEmitters = outputEmitters; + this.numberOfStreamsInOutputs = numberOfStreamsInOutputs; + this.serializer = serializer; + this.numOutputs = writers.size(); + } + + @Override + public void collect(T record) { + for (int i = 0; i < numOutputs; i++) { + KeyValueWriter writer = writers.get(i); + TezChannelSelector<T> outputEmitter = outputEmitters.get(i); + int numberOfStreamsInOutput = numberOfStreamsInOutputs.get(i); + try { + for (int channel : outputEmitter.selectChannels(record, numberOfStreamsInOutput)) { + IntWritable key = new IntWritable(channel); + writer.write(key, record); + } + } + catch (IOException e) { + throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e); + } + } + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java new file mode 100644 index 0000000..6dcee0b --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.runtime.output; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; + +public class TezOutputEmitter<T> implements TezChannelSelector<T> { + + private final ShipStrategyType strategy; // the shipping strategy used by this output emitter + + private int[] channels; // the reused array defining target channels + + private int nextChannelToSendTo = 0; // counter to go over channels round robin + + private final TypeComparator<T> comparator; // the comparator for hashing / sorting + + // ------------------------------------------------------------------------ + // Constructors + // ------------------------------------------------------------------------ + + /** + * Creates a new channel selector that distributes data round robin. + */ + public TezOutputEmitter() { + this(ShipStrategyType.NONE); + } + + /** + * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...). + * + * @param strategy The distribution strategy to be used. + */ + public TezOutputEmitter(ShipStrategyType strategy) { + this(strategy, null); + } + + /** + * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...) + * and uses the supplied comparator to hash / compare records for partitioning them deterministically. + * + * @param strategy The distribution strategy to be used. + * @param comparator The comparator used to hash / compare the records. + */ + public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) { + this(strategy, comparator, null); + } + + /** + * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...) + * and uses the supplied comparator to hash / compare records for partitioning them deterministically. + * + * @param strategy The distribution strategy to be used. + * @param comparator The comparator used to hash / compare the records. + * @param distr The distribution pattern used in the case of a range partitioning. + */ + public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution distr) { + if (strategy == null) { + throw new NullPointerException(); + } + + this.strategy = strategy; + this.comparator = comparator; + + switch (strategy) { + case FORWARD: + case PARTITION_HASH: + case PARTITION_RANGE: + case PARTITION_RANDOM: + case PARTITION_FORCED_REBALANCE: + case BROADCAST: + break; + default: + throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " + strategy.name()); + } + + if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == null) { + throw new NullPointerException("Data distribution must not be null when the ship strategy is range partitioning."); + } + } + + // ------------------------------------------------------------------------ + // Channel Selection + // ------------------------------------------------------------------------ + + @Override + public final int[] selectChannels(T record, int numberOfChannels) { + switch (strategy) { + case FORWARD: + case PARTITION_RANDOM: + case PARTITION_FORCED_REBALANCE: + return robin(numberOfChannels); + case PARTITION_HASH: + return hashPartitionDefault(record, numberOfChannels); + case PARTITION_RANGE: + return rangePartition(record, numberOfChannels); + case BROADCAST: + return broadcast(numberOfChannels); + default: + throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name()); + } + } + + // -------------------------------------------------------------------------------------------- + + private final int[] robin(int numberOfChannels) { + if (this.channels == null || this.channels.length != 1) { + this.channels = new int[1]; + } + + int nextChannel = nextChannelToSendTo + 1; + nextChannel = nextChannel < numberOfChannels ? nextChannel : 0; + + this.nextChannelToSendTo = nextChannel; + this.channels[0] = nextChannel; + return this.channels; + } + + private final int[] broadcast(int numberOfChannels) { + if (channels == null || channels.length != numberOfChannels) { + channels = new int[numberOfChannels]; + for (int i = 0; i < numberOfChannels; i++) { + channels[i] = i; + } + } + + return channels; + } + + private final int[] hashPartitionDefault(T record, int numberOfChannels) { + if (channels == null || channels.length != 1) { + channels = new int[1]; + } + + int hash = this.comparator.hash(record); + + hash = murmurHash(hash); + + if (hash >= 0) { + this.channels[0] = hash % numberOfChannels; + } + else if (hash != Integer.MIN_VALUE) { + this.channels[0] = -hash % numberOfChannels; + } + else { + this.channels[0] = 0; + } + + return this.channels; + } + + private final int murmurHash(int k) { + k *= 0xcc9e2d51; + k = Integer.rotateLeft(k, 15); + k *= 0x1b873593; + + k = Integer.rotateLeft(k, 13); + k *= 0xe6546b64; + + k ^= 4; + k ^= k >>> 16; + k *= 0x85ebca6b; + k ^= k >>> 13; + k *= 0xc2b2ae35; + k ^= k >>> 16; + + return k; + } + + private final int[] rangePartition(T record, int numberOfChannels) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java new file mode 100644 index 0000000..39d247c --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.util; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +public class DummyInvokable extends AbstractInvokable { + + private ExecutionConfig executionConfig; + + public DummyInvokable() { + } + + public DummyInvokable(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + } + + public void setExecutionConfig(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + } + + @Override + public void registerInputOutput() {} + + + @Override + public void invoke() throws Exception {} + + @Override + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java new file mode 100644 index 0000000..202cb24 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.util; + +import org.apache.flink.util.InstantiationUtil; +import org.apache.commons.codec.binary.Base64; + +import java.io.IOException; + +public class EncodingUtils { + + public static Object decodeObjectFromString(String encoded, ClassLoader cl) { + + try { + if (encoded == null) { + return null; + } + byte[] bytes = Base64.decodeBase64(encoded); + + return InstantiationUtil.deserializeObject(bytes, cl); + } + catch (IOException e) { + e.printStackTrace(); + System.exit(-1); + throw new RuntimeException(); + } + catch (ClassNotFoundException e) { + e.printStackTrace(); + System.exit(-1); + throw new RuntimeException(); + } + } + + public static String encodeObjectToString(Object o) { + + try { + byte[] bytes = InstantiationUtil.serializeObject(o); + + String encoded = Base64.encodeBase64String(bytes); + return encoded; + } + catch (IOException e) { + e.printStackTrace(); + System.exit(-1); + throw new RuntimeException(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java new file mode 100644 index 0000000..07c5f97 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.util; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class FlinkSerialization<T> extends Configured implements Serialization<T>{ + + @Override + public boolean accept(Class<?> c) { + TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); + T instance = typeSerializer.createInstance(); + return instance.getClass().isAssignableFrom(c); + } + + @Override + public Serializer<T> getSerializer(Class<T> c) { + TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); + return new FlinkSerializer<T>(typeSerializer); + } + + @Override + public Deserializer<T> getDeserializer(Class<T> c) { + TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); + return new FlinkDeserializer<T>(typeSerializer); + } + + public static class FlinkSerializer<T> implements Serializer<T> { + + private OutputStream dataOut; + private DataOutputViewOutputStreamWrapper dataOutputView; + private TypeSerializer<T> typeSerializer; + + public FlinkSerializer(TypeSerializer<T> typeSerializer) { + this.typeSerializer = typeSerializer; + } + + @Override + public void open(OutputStream out) throws IOException { + this.dataOut = out; + this.dataOutputView = new DataOutputViewOutputStreamWrapper(out); + } + + @Override + public void serialize(T t) throws IOException { + typeSerializer.serialize(t, dataOutputView); + } + + @Override + public void close() throws IOException { + this.dataOut.close(); + } + } + + public static class FlinkDeserializer<T> implements Deserializer<T> { + + private InputStream dataIn; + private TypeSerializer<T> typeSerializer; + private DataInputViewInputStreamWrapper dataInputView; + + public FlinkDeserializer(TypeSerializer<T> typeSerializer) { + this.typeSerializer = typeSerializer; + } + + @Override + public void open(InputStream in) throws IOException { + this.dataIn = in; + this.dataInputView = new DataInputViewInputStreamWrapper(in); + } + + @Override + public T deserialize(T t) throws IOException { + T reuse = t; + if (reuse == null) { + reuse = typeSerializer.createInstance(); + } + return typeSerializer.deserialize(reuse, dataInputView); + } + + @Override + public void close() throws IOException { + this.dataIn.close(); + } + } + + private static final class DataOutputViewOutputStreamWrapper implements DataOutputView { + + private final DataOutputStream dos; + + public DataOutputViewOutputStreamWrapper(OutputStream output) { + this.dos = new DataOutputStream(output); + } + + @Override + public void write(int b) throws IOException { + dos.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + dos.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + dos.write(b, off, len); + } + + @Override + public void writeBoolean(boolean v) throws IOException { + dos.writeBoolean(v); + } + + @Override + public void writeByte(int v) throws IOException { + dos.writeByte(v); + } + + @Override + public void writeShort(int v) throws IOException { + dos.writeShort(v); + } + + @Override + public void writeChar(int v) throws IOException { + dos.writeChar(v); + } + + @Override + public void writeInt(int v) throws IOException { + dos.writeInt(v); + } + + @Override + public void writeLong(long v) throws IOException { + dos.writeLong(v); + } + + @Override + public void writeFloat(float v) throws IOException { + dos.writeFloat(v); + } + + @Override + public void writeDouble(double v) throws IOException { + dos.writeDouble(v); + } + + @Override + public void writeBytes(String s) throws IOException { + dos.writeBytes(s); + } + + @Override + public void writeChars(String s) throws IOException { + dos.writeChars(s); + } + + @Override + public void writeUTF(String s) throws IOException { + dos.writeUTF(s); + } + + @Override + public void skipBytesToWrite(int num) throws IOException { + for (int i = 0; i < num; i++) { + dos.write(0); + } + } + + @Override + public void write(DataInputView inview, int num) throws IOException { + for (int i = 0; i < num; i++) { + dos.write(inview.readByte()); + } + } + } + + private static final class DataInputViewInputStreamWrapper implements DataInputView { + + private final DataInputStream dis; + + + public DataInputViewInputStreamWrapper(InputStream input) { + this.dis = new DataInputStream(input); + } + + @Override + public void readFully(byte[] b) throws IOException { + dis.readFully(b); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + dis.readFully(b, off, len); + } + + @Override + public int skipBytes(int n) throws IOException { + return dis.skipBytes(n); + } + + @Override + public boolean readBoolean() throws IOException { + return dis.readBoolean(); + } + + @Override + public byte readByte() throws IOException { + return dis.readByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + return dis.readUnsignedByte(); + } + + @Override + public short readShort() throws IOException { + return dis.readShort(); + } + + @Override + public int readUnsignedShort() throws IOException { + return dis.readUnsignedShort(); + } + + @Override + public char readChar() throws IOException { + return dis.readChar(); + } + + @Override + public int readInt() throws IOException { + return dis.readInt(); + } + + @Override + public long readLong() throws IOException { + return dis.readLong(); + } + + @Override + public float readFloat() throws IOException { + return dis.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return dis.readDouble(); + } + + @Override + public String readLine() throws IOException { + return dis.readLine(); + } + + @Override + public String readUTF() throws IOException { + return dis.readUTF(); + } + + @Override + public void skipBytesToRead(int numBytes) throws IOException { + while (numBytes > 0) { + numBytes -= dis.skipBytes(numBytes); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + dis.readFully(b, off, len); + return len; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/resources/log4j.properties b/flink-contrib/flink-tez/src/main/resources/log4j.properties new file mode 100644 index 0000000..0845c81 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/resources/log4j.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=INFO, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java new file mode 100644 index 0000000..9124faa --- /dev/null +++ b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.test; + +import org.apache.flink.test.testdata.ConnectedComponentsData; +import org.apache.flink.tez.examples.ConnectedComponentsStep; +import org.junit.Assert; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.regex.Pattern; + +/* + * Note: This does not test whether the program computes one step of the + * Weakly Connected Components program correctly. It only tests whether + * the program assigns a wrong component to a vertex. + */ + +public class ConnectedComponentsStepITCase extends TezProgramTestBase { + + private static final long SEED = 0xBADC0FFEEBEEFL; + + private static final int NUM_VERTICES = 1000; + + private static final int NUM_EDGES = 10000; + + + private String verticesPath; + private String edgesPath; + private String resultPath; + + + @Override + protected void preSubmit() throws Exception { + verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES)); + edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED)); + resultPath = getTempFilePath("results"); + } + + @Override + protected void testProgram() throws Exception { + ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, "100"); + } + + @Override + protected void postSubmit() throws Exception { + for (BufferedReader reader : getResultReader(resultPath)) { + checkOddEvenResult(reader); + } + } + + private static void checkOddEvenResult(BufferedReader result) throws IOException { + Pattern split = Pattern.compile(" "); + String line; + while ((line = result.readLine()) != null) { + String[] res = split.split(line); + Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length); + try { + int vertex = Integer.parseInt(res[0]); + int component = Integer.parseInt(res[1]); + Assert.assertTrue(((vertex % 2) == (component % 2))); + } catch (NumberFormatException e) { + Assert.fail("Malformed result."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java new file mode 100644 index 0000000..9a203fe --- /dev/null +++ b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.test; + +import org.apache.flink.test.testdata.PageRankData; +import org.apache.flink.tez.examples.PageRankBasicStep; + +public class PageRankBasicStepITCase extends TezProgramTestBase { + + private String verticesPath; + private String edgesPath; + private String resultPath; + private String expectedResult; + + public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" + + "2 0.25666666666666665\n" + + "3 0.1716666666666667\n" + + "4 0.1716666666666667\n" + + "5 0.2"; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); + edgesPath = createTempFile("edges.txt", PageRankData.EDGES); + } + + @Override + protected void testProgram() throws Exception { + PageRankBasicStep.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "-1"}); + expectedResult = RANKS_AFTER_1_ITERATION; + } + + @Override + protected void postSubmit() throws Exception { + compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java new file mode 100644 index 0000000..eda9d1a --- /dev/null +++ b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.test; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.tez.client.LocalTezEnvironment; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public abstract class TezProgramTestBase extends AbstractTestBase { + + private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4; + + private JobExecutionResult latestExecutionResult; + + private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM; + + + public TezProgramTestBase() { + this(new Configuration()); + } + + public TezProgramTestBase(Configuration config) { + super (config); + } + + + public void setParallelism(int degreeOfParallelism) { + this.degreeOfParallelism = degreeOfParallelism; + } + + public JobExecutionResult getLatestExecutionResult() { + return this.latestExecutionResult; + } + + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + // -------------------------------------------------------------------------------------------- + // Test entry point + // -------------------------------------------------------------------------------------------- + + // Ignored due to deadlocks in Tez 0.6.1 (https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt) + // TODO Reactivate with future Tez versions + @Ignore + @Test + public void testJob() throws Exception { + // pre-submit + try { + preSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Pre-submit work caused an error: " + e.getMessage()); + } + + // prepare the test environment + LocalTezEnvironment env = LocalTezEnvironment.create(); + env.setParallelism(degreeOfParallelism); + env.setAsContext(); + + // call the test program + try { + testProgram(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Error while calling the test program: " + e.getMessage()); + } + + // post-submit + try { + postSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Post-submit work caused an error: " + e.getMessage()); + } + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java new file mode 100644 index 0000000..35aa54a --- /dev/null +++ b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.test; + + +import org.apache.flink.examples.java.relational.WebLogAnalysis; +import org.apache.flink.test.testdata.WebLogAnalysisData; + +public class WebLogAnalysisITCase extends TezProgramTestBase { + + private String docsPath; + private String ranksPath; + private String visitsPath; + private String resultPath; + + @Override + protected void preSubmit() throws Exception { + docsPath = createTempFile("docs", WebLogAnalysisData.DOCS); + ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS); + visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath); + } + @Override + protected void testProgram() throws Exception { + WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java new file mode 100644 index 0000000..d73aa8b --- /dev/null +++ b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.test; + +import org.apache.flink.examples.java.wordcount.WordCount; +import org.apache.flink.test.testdata.WordCountData; + +public class WordCountITCase extends TezProgramTestBase { + + protected String textPath; + protected String resultPath; + + public WordCountITCase(){ + } + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + WordCount.main(new String[]{textPath, resultPath}); + } +}