http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java
deleted file mode 100644
index c5dd8ec..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-/**
- * Generates records with an id and a and CoordVector.
- * The input format is line-based, i.e. one record is read from one line
- * which is terminated by '\n'. Within a line the first '|' character separates
- * the id from the CoordVector. The vector consists of a vector of decimals.
- * The decimals are separated by '|' as well. The id is the id of a data point 
or
- * cluster center and the CoordVector the corresponding position (coordinate
- * vector) of the data point or cluster center. Example line:
- * "42|23.23|52.57|74.43| Id: 42 Coordinate vector: (23.23, 52.57, 74.43)
- */
-public class PointInFormat extends DelimitedInputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       private final IntValue idInteger = new IntValue();
-       private final CoordVector point = new CoordVector();
-       
-       private final List<Double> dimensionValues = new ArrayList<Double>();
-       private double[] pointValues = new double[0];
-       
-       @Override
-       public Record readRecord(Record record, byte[] line, int offset, int 
numBytes) {
-               
-               final int limit = offset + numBytes;
-               
-               int id = -1;
-               int value = 0;
-               int fractionValue = 0;
-               int fractionChars = 0;
-               boolean negative = false;
-               
-               this.dimensionValues.clear();
-
-               for (int pos = offset; pos < limit; pos++) {
-                       if (line[pos] == '|') {
-                               // check if id was already set
-                               if (id == -1) {
-                                       id = value;
-                               }
-                               else {
-                                       double v = value + ((double) 
fractionValue) * Math.pow(10, (-1 * (fractionChars - 1)));
-                                       this.dimensionValues.add(negative ? -v 
: v);
-                               }
-                               // reset value
-                               value = 0;
-                               fractionValue = 0;
-                               fractionChars = 0;
-                               negative = false;
-                       } else if (line[pos] == '.') {
-                               fractionChars = 1;
-                       } else if (line[pos] == '-') {
-                               negative = true;
-                       } else {
-                               if (fractionChars == 0) {
-                                       value *= 10;
-                                       value += line[pos] - '0';
-                               } else {
-                                       fractionValue *= 10;
-                                       fractionValue += line[pos] - '0';
-                                       fractionChars++;
-                               }
-                       }
-               }
-
-               // set the ID
-               this.idInteger.setValue(id);
-               record.setField(0, this.idInteger);
-               
-               // set the data points
-               if (this.pointValues.length != this.dimensionValues.size()) {
-                       this.pointValues = new 
double[this.dimensionValues.size()];
-               }
-               for (int i = 0; i < this.pointValues.length; i++) {
-                       this.pointValues[i] = this.dimensionValues.get(i);
-               }
-               
-               this.point.setCoordinates(this.pointValues);
-               record.setField(1, this.point);
-               return record;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java
deleted file mode 100644
index 410397e..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-/**
- * Writes records that contain an id and a CoordVector.
- * The output format is line-based, i.e. one record is written to
- * a line and terminated with '\n'. Within a line the first '|' character
- * separates the id from the CoordVector. The vector consists of a vector of
- * decimals. The decimals are separated by '|'. The is is the id of a data
- * point or cluster center and the vector the corresponding position
- * (coordinate vector) of the data point or cluster center. Example line:
- * "42|23.23|52.57|74.43| Id: 42 Coordinate vector: (23.23, 52.57, 74.43)
- */
-public class PointOutFormat extends DelimitedOutputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       private final DecimalFormat df = new DecimalFormat("####0.00");
-       private final StringBuilder line = new StringBuilder();
-       
-       
-       public PointOutFormat() {
-               DecimalFormatSymbols dfSymbols = new DecimalFormatSymbols();
-               dfSymbols.setDecimalSeparator('.');
-               this.df.setDecimalFormatSymbols(dfSymbols);
-       }
-       
-       @Override
-       public int serializeRecord(Record record, byte[] target) {
-               
-               line.setLength(0);
-               
-               IntValue centerId = record.getField(0, IntValue.class);
-               CoordVector centerPos = record.getField(1, CoordVector.class);
-               
-               
-               line.append(centerId.getValue());
-
-               for (double coord : centerPos.getCoordinates()) {
-                       line.append('|');
-                       line.append(df.format(coord));
-               }
-               line.append('|');
-               
-               byte[] byteString = line.toString().getBytes();
-               
-               if (byteString.length <= target.length) {
-                       System.arraycopy(byteString, 0, target, 0, 
byteString.length);
-                       return byteString.length;
-               }
-               else {
-                       return -byteString.length;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
deleted file mode 100644
index 89e222b..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.kmeans.udfs;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * Reduce PACT computes the new position (coordinate vector) of a cluster
- * center. This is an average computation. Hence, Combinable is annotated
- * and the combine method implemented. 
- * 
- * Output Format:
- * 0: clusterID
- * 1: clusterVector
- */
-@SuppressWarnings("deprecation")
-@Combinable
-@ConstantFields(0)
-public class RecomputeClusterCenter extends ReduceFunction implements 
Serializable {
-       private static final long serialVersionUID = 1L;
-       
-       private final IntValue count = new IntValue();
-       
-       /**
-        * Compute the new position (coordinate vector) of a cluster center.
-        */
-       @Override
-       public void reduce(Iterator<Record> dataPoints, Collector<Record> out) {
-               Record next = null;
-                       
-               // initialize coordinate vector sum and count
-               CoordVector coordinates = new CoordVector();
-               double[] coordinateSum = null;
-               int count = 0;  
-
-               // compute coordinate vector sum and count
-               while (dataPoints.hasNext()) {
-                       next = dataPoints.next();
-                       
-                       // get the coordinates and the count from the record
-                       double[] thisCoords = next.getField(1, 
CoordVector.class).getCoordinates();
-                       int thisCount = next.getField(2, 
IntValue.class).getValue();
-                       
-                       if (coordinateSum == null) {
-                               if (coordinates.getCoordinates() != null) {
-                                       coordinateSum = 
coordinates.getCoordinates();
-                               }
-                               else {
-                                       coordinateSum = new 
double[thisCoords.length];
-                               }
-                       }
-
-                       addToCoordVector(coordinateSum, thisCoords);
-                       count += thisCount;
-               }
-
-               // compute new coordinate vector (position) of cluster center
-               for (int i = 0; i < coordinateSum.length; i++) {
-                       coordinateSum[i] /= count;
-               }
-               
-               coordinates.setCoordinates(coordinateSum);
-               next.setField(1, coordinates);
-               next.setNull(2);
-
-               // emit new position of cluster center
-               out.collect(next);
-       }
-
-       /**
-        * Computes a pre-aggregated average value of a coordinate vector.
-        */
-       @Override
-       public void combine(Iterator<Record> dataPoints, Collector<Record> out) 
{
-               
-               Record next = null;
-               
-               // initialize coordinate vector sum and count
-               CoordVector coordinates = new CoordVector();
-               double[] coordinateSum = null;
-               int count = 0;  
-
-               // compute coordinate vector sum and count
-               while (dataPoints.hasNext()) {
-                       next = dataPoints.next();
-                       
-                       // get the coordinates and the count from the record
-                       double[] thisCoords = next.getField(1, 
CoordVector.class).getCoordinates();
-                       int thisCount = next.getField(2, 
IntValue.class).getValue();
-                       
-                       if (coordinateSum == null) {
-                               if (coordinates.getCoordinates() != null) {
-                                       coordinateSum = 
coordinates.getCoordinates();
-                               }
-                               else {
-                                       coordinateSum = new 
double[thisCoords.length];
-                               }
-                       }
-
-                       addToCoordVector(coordinateSum, thisCoords);
-                       count += thisCount;
-               }
-               
-               coordinates.setCoordinates(coordinateSum);
-               this.count.setValue(count);
-               next.setField(1, coordinates);
-               next.setField(2, this.count);
-               
-               // emit partial sum and partial count for average computation
-               out.collect(next);
-       }
-
-       /**
-        * Adds two coordinate vectors by summing up each of their coordinates.
-        * 
-        * @param cvToAddTo
-        *        The coordinate vector to which the other vector is added.
-        *        This vector is returned.
-        * @param cvToBeAdded
-        *        The coordinate vector which is added to the other vector.
-        *        This vector is not modified.
-        */
-       private void addToCoordVector(double[] cvToAddTo, double[] cvToBeAdded) 
{
-               // check if both vectors have same length
-               if (cvToAddTo.length != cvToBeAdded.length) {
-                       throw new IllegalArgumentException("The given 
coordinate vectors are not of equal length.");
-               }
-
-               // sum coordinate vectors coordinate-wise
-               for (int i = 0; i < cvToAddTo.length; i++) {
-                       cvToAddTo[i] += cvToBeAdded[i];
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
deleted file mode 100644
index b948804..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class MergeOnlyJoin implements Program {
-
-       private static final long serialVersionUID = 1L;
-
-       @ConstantFieldsFirstExcept(2)
-       public static class JoinInputs extends JoinFunction {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void join(Record input1, Record input2, 
Collector<Record> out) {
-                       input1.setField(2, input2.getField(1, IntValue.class));
-                       out.collect(input1);
-               }
-       }
-
-       @ConstantFieldsExcept({})
-       public static class DummyReduce extends ReduceFunction {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void reduce(Iterator<Record> values, Collector<Record> 
out) {
-                       while (values.hasNext()) {
-                               out.collect(values.next());
-                       }
-               }
-       }
-
-
-       @Override
-       public Plan getPlan(final String... args) {
-               // parse program parameters
-               int numSubtasks       = (args.length > 0 ? 
Integer.parseInt(args[0]) : 1);
-               String input1Path    = (args.length > 1 ? args[1] : "");
-               String input2Path    = (args.length > 2 ? args[2] : "");
-               String output        = (args.length > 3 ? args[3] : "");
-               int numSubtasksInput2 = (args.length > 4 ? 
Integer.parseInt(args[4]) : 1);
-
-               // create DataSourceContract for Orders input
-               @SuppressWarnings("unchecked")
-               CsvInputFormat format1 = new CsvInputFormat('|', 
IntValue.class, IntValue.class);
-               FileDataSource input1 = new FileDataSource(format1, input1Path, 
"Input 1");
-               
-               ReduceOperator aggInput1 = 
ReduceOperator.builder(DummyReduce.class, IntValue.class, 0)
-                       .input(input1)
-                       .name("AggOrders")
-                       .build();
-
-               
-               // create DataSourceContract for Orders input
-               @SuppressWarnings("unchecked")
-               CsvInputFormat format2 = new CsvInputFormat('|', 
IntValue.class, IntValue.class);
-               FileDataSource input2 = new FileDataSource(format2, input2Path, 
"Input 2");
-               input2.setParallelism(numSubtasksInput2);
-
-               ReduceOperator aggInput2 = 
ReduceOperator.builder(DummyReduce.class, IntValue.class, 0)
-                       .input(input2)
-                       .name("AggLines")
-                       .build();
-               aggInput2.setParallelism(numSubtasksInput2);
-               
-               // create JoinOperator for joining Orders and LineItems
-               JoinOperator joinLiO = JoinOperator.builder(JoinInputs.class, 
IntValue.class, 0, 0)
-                       .input1(aggInput1)
-                       .input2(aggInput2)
-                       .name("JoinLiO")
-                       .build();
-
-               // create DataSinkContract for writing the result
-               FileDataSink result = new FileDataSink(new CsvOutputFormat(), 
output, joinLiO, "Output");
-               CsvOutputFormat.configureRecordFormat(result)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .lenient(true)
-                       .field(IntValue.class, 0)
-                       .field(IntValue.class, 1)
-                       .field(IntValue.class, 2);
-               
-               // assemble the PACT plan
-               Plan plan = new Plan(result, "Merge Only Join");
-               plan.setDefaultParallelism(numSubtasks);
-               return plan;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
deleted file mode 100644
index d805b92..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import 
org.apache.flink.test.recordJobs.relational.query1Util.GroupByReturnFlag;
-import org.apache.flink.test.recordJobs.relational.query1Util.LineItemFilter;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat;
-import org.apache.flink.types.StringValue;
-
-@SuppressWarnings("deprecation")
-public class TPCHQuery1 implements Program, ProgramDescription {
-
-       private static final long serialVersionUID = 1L;
-
-       private int parallelism = 1;
-       private String lineItemInputPath;
-       private String outputPath;
-       
-       @Override
-       public Plan getPlan(String... args) throws IllegalArgumentException {
-               
-               
-               if (args.length != 3) {
-                       this.parallelism = 1;
-                       this.lineItemInputPath = "";
-                       this.outputPath = "";
-               } else {
-                       this.parallelism = Integer.parseInt(args[0]);
-                       this.lineItemInputPath = args[1];
-                       this.outputPath = args[2];
-               }
-               
-               FileDataSource lineItems =
-                       new FileDataSource(new IntTupleDataInFormat(), 
this.lineItemInputPath, "LineItems");
-               lineItems.setParallelism(this.parallelism);
-               
-               FileDataSink result = 
-                       new FileDataSink(new StringTupleDataOutFormat(), 
this.outputPath, "Output");
-               result.setParallelism(this.parallelism);
-               
-               MapOperator lineItemFilter = 
-                       MapOperator.builder(new LineItemFilter())
-                       .name("LineItem Filter")
-                       .build();
-               lineItemFilter.setParallelism(this.parallelism);
-               
-               ReduceOperator groupByReturnFlag = 
-                       ReduceOperator.builder(new GroupByReturnFlag(), 
StringValue.class, 0)
-                       .name("groupyBy")
-                       .build();
-               
-               lineItemFilter.setInput(lineItems);
-               groupByReturnFlag.setInput(lineItemFilter);
-               result.setInput(groupByReturnFlag);
-               
-               return new Plan(result, "TPC-H 1");
-       }
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [parallelism] [lineitem-input] [output]";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
deleted file mode 100644
index 4bb0cdf..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.Tuple;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class TPCHQuery10 implements Program, ProgramDescription {
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                         Local Filters and Projections
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Forwards (0 = orderkey, 1 = custkey).
-        */
-       public static class FilterO extends MapFunction
-       {
-               private static final int YEAR_FILTER = 1990;
-               
-               private final IntValue custKey = new IntValue();
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception {
-                       
-                       Tuple t = record.getField(1, Tuple.class);
-                       if (Integer.parseInt(t.getStringValueAt(4).substring(0, 
4)) > FilterO.YEAR_FILTER) {
-                               // project
-                               this.custKey.setValue((int) 
t.getLongValueAt(1));
-                               record.setField(1, this.custKey);
-                               out.collect(record);
-                       }
-                       
-               }
-       }
-
-       /**
-        * Forwards (0 = lineitem, 1 = tuple (extendedprice, discount) )
-        */
-       public static class FilterLI extends MapFunction
-       {
-               private final Tuple tuple = new Tuple();
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception
-               {
-                       Tuple t = record.getField(1, this.tuple);
-                       if (t.getStringValueAt(8).equals("R")) {
-                               t.project(0x60); // l_extendedprice, l_discount
-                               record.setField(1, t);
-                               out.collect(record);
-                       }
-               }
-       }
-       
-       /**
-        * Returns (0 = custkey, 1 = custName, 2 = NULL, 3 = balance, 4 = 
nationkey, 5 = address, 6 = phone, 7 = comment)
-        */
-       public static class ProjectC extends MapFunction {
-
-               private final Tuple tuple = new Tuple();
-               
-               private final StringValue custName = new StringValue();
-               
-               private final StringValue balance = new StringValue();
-               private final IntValue nationKey = new IntValue();
-               private final StringValue address = new StringValue();
-               private final StringValue phone = new StringValue();
-               private final StringValue comment = new StringValue();
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception
-               {
-                       final Tuple t = record.getField(1, this.tuple);
-                       
-                       this.custName.setValue(t.getStringValueAt(1));
-                       this.address.setValue(t.getStringValueAt(2));
-                       this.nationKey.setValue((int) t.getLongValueAt(3));
-                       this.phone.setValue(t.getStringValueAt(4));
-                       this.balance.setValue(t.getStringValueAt(5));
-                       this.comment.setValue(t.getStringValueAt(7));
-                       
-                       record.setField(1, this.custName);
-                       record.setField(3, this.balance);
-                       record.setField(4, this.nationKey);
-                       record.setField(5, this.address);
-                       record.setField(6, this.phone);
-                       record.setField(7, this.comment);
-                       
-                       out.collect(record);
-               }
-       }
-       
-       /**
-        * Returns (0 = nationkey, 1 = nation_name)
-        */
-       public static class ProjectN extends MapFunction
-       {
-               private final Tuple tuple = new Tuple();
-               private final StringValue nationName = new StringValue();
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception
-               {
-                       final Tuple t = record.getField(1, this.tuple);
-                       
-                       this.nationName.setValue(t.getStringValueAt(1));
-                       record.setField(1, this.nationName);
-                       out.collect(record);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                        Joins
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Returns (0 = custKey, 1 = tuple (extendedprice, discount) )
-        */
-       public static class JoinOL extends JoinFunction
-       {
-               @Override
-               public void join(Record order, Record lineitem, 
Collector<Record> out) throws Exception {
-                       lineitem.setField(0, order.getField(1, IntValue.class));
-                       out.collect(lineitem);
-               }
-       }
-
-       /**
-        * Returns (0 = custkey, 1 = custName, 2 = extPrice * (1-discount), 3 = 
balance, 4 = nationkey, 5 = address, 6 = phone, 7 = comment)
-        */
-       public static class JoinCOL extends JoinFunction
-       {
-               private final DoubleValue d = new DoubleValue();
-               
-               @Override
-               public void join(Record custRecord, Record olRecord, 
Collector<Record> out) throws Exception
-               {
-                       final Tuple t = olRecord.getField(1, Tuple.class);
-                       final double extPrice = 
Double.parseDouble(t.getStringValueAt(0));
-                       final double discount = 
Double.parseDouble(t.getStringValueAt(1));
-                       
-                       this.d.setValue(extPrice * (1 - discount));
-                       custRecord.setField(2, this.d);
-                       out.collect(custRecord);
-               }
-
-       }
-       
-       /**
-        * Returns (0 = custkey, 1 = custName, 2 = extPrice * (1-discount), 3 = 
balance, 4 = nationName, 5 = address, 6 = phone, 7 = comment)
-        */
-       public static class JoinNCOL extends JoinFunction
-       {
-               @Override
-               public void join(Record colRecord, Record nation, 
Collector<Record> out) throws Exception {
-                       colRecord.setField(4, nation.getField(1, 
StringValue.class));
-                       out.collect(colRecord);
-               }
-       }
-       
-       @ReduceOperator.Combinable
-       public static class Sum extends ReduceFunction
-       {
-               private final DoubleValue d = new DoubleValue();
-               
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception
-               {
-                       Record record = null;
-                       double sum = 0;
-                       while (records.hasNext()) {
-                               record = records.next();
-                               sum += record.getField(2, 
DoubleValue.class).getValue();
-                       }
-               
-                       this.d.setValue(sum);
-                       record.setField(2, this.d);
-                       out.collect(record);
-               }
-               
-               @Override
-               public void combine(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       reduce(records,out);
-               }
-       }
-
-       public static class TupleOutputFormat extends FileOutputFormat {
-               private static final long serialVersionUID = 1L;
-               
-               private final DecimalFormat formatter;
-               private final StringBuilder buffer = new StringBuilder();
-               
-               public TupleOutputFormat() {
-                       DecimalFormatSymbols decimalFormatSymbol = new 
DecimalFormatSymbols();
-                       decimalFormatSymbol.setDecimalSeparator('.');
-                       
-                       this.formatter = new DecimalFormat("#.####");
-                       
this.formatter.setDecimalFormatSymbols(decimalFormatSymbol);
-               }
-               
-               @Override
-               public void writeRecord(Record record) throws IOException
-               {
-                       this.buffer.setLength(0);
-                       this.buffer.append(record.getField(0, 
IntValue.class).toString()).append('|');
-                       this.buffer.append(record.getField(1, 
StringValue.class).toString()).append('|');
-                       
-                       
this.buffer.append(this.formatter.format(record.getField(2, 
DoubleValue.class).getValue())).append('|');
-                       
-                       this.buffer.append(record.getField(3, 
StringValue.class).toString()).append('|');
-                       this.buffer.append(record.getField(4, 
StringValue.class).toString()).append('|');
-                       this.buffer.append(record.getField(5, 
StringValue.class).toString()).append('|');
-                       this.buffer.append(record.getField(6, 
StringValue.class).toString()).append('|');
-                       this.buffer.append(record.getField(7, 
StringValue.class).toString()).append('|');
-                       
-                       this.buffer.append('\n');
-                       
-                       final byte[] bytes = this.buffer.toString().getBytes();
-                       this.stream.write(bytes);
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "TPC-H Query 10";
-       }
-
-       @Override
-       public Plan getPlan(String... args) throws IllegalArgumentException {
-               final String ordersPath;
-               final String lineitemsPath;
-               final String customersPath;
-               final String nationsPath;
-               final String resultPath;
-               
-               final int parallelism;
-
-               if (args.length < 6) {
-                       throw new IllegalArgumentException("Invalid number of 
parameters");
-               } else {
-                       parallelism = Integer.parseInt(args[0]);
-                       ordersPath = args[1];
-                       lineitemsPath = args[2];
-                       customersPath = args[3];
-                       nationsPath = args[4];
-                       resultPath = args[5];
-               }
-               
-               FileDataSource orders = new FileDataSource(new 
IntTupleDataInFormat(), ordersPath, "Orders");
-               // orders.setOutputContract(UniqueKey.class);
-               // orders.getCompilerHints().setAvgNumValuesPerKey(1);
-
-               FileDataSource lineitems = new FileDataSource(new 
IntTupleDataInFormat(), lineitemsPath, "LineItems");
-               // lineitems.getCompilerHints().setAvgNumValuesPerKey(4);
-
-               FileDataSource customers = new FileDataSource(new 
IntTupleDataInFormat(), customersPath, "Customers");
-
-               FileDataSource nations = new FileDataSource(new 
IntTupleDataInFormat(), nationsPath, "Nations");
-
-
-               MapOperator mapO = MapOperator.builder(FilterO.class)
-                       .name("FilterO")
-                       .build();
-
-               MapOperator mapLi = MapOperator.builder(FilterLI.class)
-                       .name("FilterLi")
-                       .build();
-
-               MapOperator projectC = MapOperator.builder(ProjectC.class)
-                       .name("ProjectC")
-                       .build();
-
-               MapOperator projectN = MapOperator.builder(ProjectN.class)
-                       .name("ProjectN")
-                       .build();
-
-               JoinOperator joinOL = JoinOperator.builder(JoinOL.class, 
IntValue.class, 0, 0)
-                       .name("JoinOL")
-                       .build();
-
-               JoinOperator joinCOL = JoinOperator.builder(JoinCOL.class, 
IntValue.class, 0, 0)
-                       .name("JoinCOL")
-                       .build();
-
-               JoinOperator joinNCOL = JoinOperator.builder(JoinNCOL.class, 
IntValue.class, 4, 0)
-                       .name("JoinNCOL")
-                       .build();
-
-               ReduceOperator reduce = ReduceOperator.builder(Sum.class)
-                       .keyField(IntValue.class, 0) 
-                       .keyField(StringValue.class, 1)
-                       .keyField(StringValue.class, 3)
-                       .keyField(StringValue.class, 4)
-                       .keyField(StringValue.class, 5)
-                       .keyField(StringValue.class, 6)
-                       .keyField(StringValue.class, 7)
-                       .name("Reduce")
-                       .build();
-
-               FileDataSink result = new FileDataSink(new TupleOutputFormat(), 
resultPath, "Output");
-
-               result.setInput(reduce);
-               
-               reduce.setInput(joinNCOL);
-               
-               joinNCOL.setFirstInput(joinCOL);
-               joinNCOL.setSecondInput(projectN);
-               
-               joinCOL.setFirstInput(projectC);
-               joinCOL.setSecondInput(joinOL);
-               
-               joinOL.setFirstInput(mapO);
-               joinOL.setSecondInput(mapLi);
-               
-               projectC.setInput(customers);
-               projectN.setInput(nations);
-               mapLi.setInput(lineitems);
-               mapO.setInput(orders);
-
-               // return the PACT plan
-               Plan p = new Plan(result, "TPCH Q10");
-               p.setDefaultParallelism(parallelism);
-               return p;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
deleted file mode 100644
index cebe6f9..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-/**
- * The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format.  
- * 
- * This program implements a modified version of the query 3 of 
- * the TPC-H benchmark including one join, some filtering and an
- * aggregation.
- * 
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- *   FROM orders, lineitem
- *   WHERE l_orderkey = o_orderkey
- *     AND o_orderstatus = "X" 
- *     AND YEAR(o_orderdate) > Y
- *     AND o_orderpriority LIKE "Z%"
- * GROUP BY l_orderkey, o_shippriority;
- */
-@SuppressWarnings("deprecation")
-public class TPCHQuery3 implements Program, ProgramDescription {
-
-       private static final long serialVersionUID = 1L;
-       
-       public static final String YEAR_FILTER = "parameter.YEAR_FILTER";
-       public static final String PRIO_FILTER = "parameter.PRIO_FILTER";
-
-       /**
-        * Map PACT implements the selection and projection on the orders table.
-        */
-       @ConstantFields({0,1})
-       public static class FilterO extends MapFunction implements Serializable 
{
-               private static final long serialVersionUID = 1L;
-               
-               private String prioFilter;              // filter literal for 
the order priority
-               private int yearFilter;                 // filter literal for 
the year
-               
-               // reusable objects for the fields touched in the mapper
-               private StringValue orderStatus;
-               private StringValue orderDate;
-               private StringValue orderPrio;
-               
-               /**
-                * Reads the filter literals from the configuration.
-                * 
-                * @see 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)
-                */
-               @Override
-               public void open(Configuration parameters) {
-                       this.yearFilter = parameters.getInteger(YEAR_FILTER, 
1990);
-                       this.prioFilter = parameters.getString(PRIO_FILTER, 
"0");
-               }
-       
-               /**
-                * Filters the orders table by year, order status and order 
priority.
-                *
-                *  o_orderstatus = "X" 
-                *  AND YEAR(o_orderdate) > Y
-                *  AND o_orderpriority LIKE "Z"
-                *  
-                * Output Schema: 
-                *   0:ORDERKEY, 
-                *   1:SHIPPRIORITY
-                */
-               @Override
-               public void map(final Record record, final Collector<Record> 
out) {
-                       orderStatus = record.getField(2, StringValue.class);
-                       if (!orderStatus.getValue().equals("F")) {
-                               return;
-                       }
-                       
-                       orderPrio = record.getField(4, StringValue.class);
-                       if(!orderPrio.getValue().startsWith(this.prioFilter)) {
-                               return;
-                       }
-                       
-                       orderDate = record.getField(3, StringValue.class);
-                       if 
(!(Integer.parseInt(orderDate.getValue().substring(0, 4)) > this.yearFilter)) {
-                               return;
-                       }
-                       
-                       record.setNumFields(2);
-                       out.collect(record);
-               }
-       }
-
-       /**
-        * Match PACT realizes the join between LineItem and Order table.
-        *
-        */
-       @ConstantFieldsFirst({0,1})
-       public static class JoinLiO extends JoinFunction implements 
Serializable {
-               private static final long serialVersionUID = 1L;
-               
-               /**
-                * Implements the join between LineItem and Order table on the 
order key.
-                * 
-                * Output Schema:
-                *   0:ORDERKEY
-                *   1:SHIPPRIORITY
-                *   2:EXTENDEDPRICE
-                */
-               @Override
-               public void join(Record order, Record lineitem, 
Collector<Record> out) {
-                       order.setField(2, lineitem.getField(1, 
DoubleValue.class));
-                       out.collect(order);
-               }
-       }
-
-       /**
-        * Reduce PACT implements the sum aggregation. 
-        * The Combinable annotation is set as the partial sums can be 
calculated
-        * already in the combiner
-        *
-        */
-       @Combinable
-       @ConstantFields({0,1})
-       public static class AggLiO extends ReduceFunction implements 
Serializable {
-               private static final long serialVersionUID = 1L;
-               
-               private final DoubleValue extendedPrice = new DoubleValue();
-               
-               /**
-                * Implements the sum aggregation.
-                * 
-                * Output Schema:
-                *   0:ORDERKEY
-                *   1:SHIPPRIORITY
-                *   2:SUM(EXTENDEDPRICE)
-                */
-               @Override
-               public void reduce(Iterator<Record> values, Collector<Record> 
out) {
-                       Record rec = null;
-                       double partExtendedPriceSum = 0;
-
-                       while (values.hasNext()) {
-                               rec = values.next();
-                               partExtendedPriceSum += rec.getField(2, 
DoubleValue.class).getValue();
-                       }
-
-                       this.extendedPrice.setValue(partExtendedPriceSum);
-                       rec.setField(2, this.extendedPrice);
-                       out.collect(rec);
-               }
-
-               /**
-                * Creates partial sums on the price attribute for each data 
batch.
-                */
-               @Override
-               public void combine(Iterator<Record> values, Collector<Record> 
out) {
-                       reduce(values, out);
-               }
-       }
-
-
-       @Override
-       public Plan getPlan(final String... args) {
-               // parse program parameters
-               final int numSubtasks       = (args.length > 0 ? 
Integer.parseInt(args[0]) : 1);
-               final String ordersPath    = (args.length > 1 ? args[1] : "");
-               final String lineitemsPath = (args.length > 2 ? args[2] : "");
-               final String output        = (args.length > 3 ? args[3] : "");
-
-               // create DataSourceContract for Orders input
-               FileDataSource orders = new FileDataSource(new 
CsvInputFormat(), ordersPath, "Orders");
-               CsvInputFormat.configureRecordFormat(orders)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(LongValue.class, 0)              // order id
-                       .field(IntValue.class, 7)               // ship prio
-                       .field(StringValue.class, 2, 2) // order status
-                       .field(StringValue.class, 4, 10)        // order date
-                       .field(StringValue.class, 5, 8);        // order prio
-
-               // create DataSourceContract for LineItems input
-               FileDataSource lineitems = new FileDataSource(new 
CsvInputFormat(), lineitemsPath, "LineItems");
-               CsvInputFormat.configureRecordFormat(lineitems)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(LongValue.class, 0)              // order id
-                       .field(DoubleValue.class, 5);   // extended price
-
-               // create MapOperator for filtering Orders tuples
-               MapOperator filterO = MapOperator.builder(new FilterO())
-                       .input(orders)
-                       .name("FilterO")
-                       .build();
-               // filter configuration
-               filterO.setParameter(YEAR_FILTER, 1993);
-               filterO.setParameter(PRIO_FILTER, "5");
-               // compiler hints
-               filterO.getCompilerHints().setFilterFactor(0.05f);
-
-               // create JoinOperator for joining Orders and LineItems
-               JoinOperator joinLiO = JoinOperator.builder(new JoinLiO(), 
LongValue.class, 0, 0)
-                       .input1(filterO)
-                       .input2(lineitems)
-                       .name("JoinLiO")
-                       .build();
-
-               // create ReduceOperator for aggregating the result
-               // the reducer has a composite key, consisting of the fields 0 
and 1
-               ReduceOperator aggLiO = ReduceOperator.builder(new AggLiO())
-                       .keyField(LongValue.class, 0)
-                       .keyField(StringValue.class, 1)
-                       .input(joinLiO)
-                       .name("AggLio")
-                       .build();
-
-               // create DataSinkContract for writing the result
-               FileDataSink result = new FileDataSink(new CsvOutputFormat(), 
output, aggLiO, "Output");
-               CsvOutputFormat.configureRecordFormat(result)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .lenient(true)
-                       .field(LongValue.class, 0)
-                       .field(IntValue.class, 1)
-                       .field(DoubleValue.class, 2);
-               
-               // assemble the PACT plan
-               Plan plan = new Plan(result, "TPCH Q3");
-               plan.setDefaultParallelism(numSubtasks);
-               return plan;
-       }
-
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [numSubStasks], [orders], [lineitem], 
[output]";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
deleted file mode 100644
index 157e3cf..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3.AggLiO;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3.FilterO;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3.JoinLiO;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format.  
- * THe PACT program implements a modified version of the query 3 of 
- * the TPC-H benchmark including one join, some filtering and an
- * aggregation.
- * 
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- *   FROM orders, lineitem
- *   WHERE l_orderkey = o_orderkey
- *     AND o_orderstatus = "X" 
- *     AND YEAR(o_orderdate) > Y
- *     AND o_orderpriority LIKE "Z%"
- * GROUP BY l_orderkey, o_shippriority;
- */
-@SuppressWarnings("deprecation")
-public class TPCHQuery3Unioned implements Program, ProgramDescription {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public Plan getPlan(final String... args) {
-               // parse program parameters
-               final int numSubtasks       = (args.length > 0 ? 
Integer.parseInt(args[0]) : 1);
-               String orders1Path    = (args.length > 1 ? args[1] : "");
-               String orders2Path    = (args.length > 2 ? args[2] : "");
-               String partJoin1Path    = (args.length > 3 ? args[3] : "");
-               String partJoin2Path    = (args.length > 4 ? args[4] : "");
-               
-               String lineitemsPath = (args.length > 5 ? args[5] : "");
-               String output        = (args.length > 6 ? args[6] : "");
-
-               // create DataSourceContract for Orders input
-               FileDataSource orders1 = new FileDataSource(new 
CsvInputFormat(), orders1Path, "Orders 1");
-               CsvInputFormat.configureRecordFormat(orders1)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(LongValue.class, 0)              // order id
-                       .field(IntValue.class, 7)               // ship prio
-                       .field(StringValue.class, 2, 2) // order status
-                       .field(StringValue.class, 4, 10)        // order date
-                       .field(StringValue.class, 5, 8);        // order prio
-               
-               FileDataSource orders2 = new FileDataSource(new 
CsvInputFormat(), orders2Path, "Orders 2");
-               CsvInputFormat.configureRecordFormat(orders2)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(LongValue.class, 0)              // order id
-                       .field(IntValue.class, 7)               // ship prio
-                       .field(StringValue.class, 2, 2) // order status
-                       .field(StringValue.class, 4, 10)        // order date
-                       .field(StringValue.class, 5, 8);        // order prio
-               
-               // create DataSourceContract for LineItems input
-               FileDataSource lineitems = new FileDataSource(new 
CsvInputFormat(), lineitemsPath, "LineItems");
-               CsvInputFormat.configureRecordFormat(lineitems)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(LongValue.class, 0)
-                       .field(DoubleValue.class, 5);
-
-               // create MapOperator for filtering Orders tuples
-               MapOperator filterO1 = MapOperator.builder(new FilterO())
-                       .name("FilterO")
-                       .input(orders1)
-                       .build();
-               // filter configuration
-               filterO1.setParameter(TPCHQuery3.YEAR_FILTER, 1993);
-               filterO1.setParameter(TPCHQuery3.PRIO_FILTER, "5");
-               filterO1.getCompilerHints().setFilterFactor(0.05f);
-               
-               // create MapOperator for filtering Orders tuples
-               MapOperator filterO2 = MapOperator.builder(new FilterO())
-                       .name("FilterO")
-                       .input(orders2)
-                       .build();
-               // filter configuration
-               filterO2.setParameter(TPCHQuery3.YEAR_FILTER, 1993);
-               filterO2.setParameter(TPCHQuery3.PRIO_FILTER, "5");
-
-               // create JoinOperator for joining Orders and LineItems
-               @SuppressWarnings("unchecked")
-               JoinOperator joinLiO = JoinOperator.builder(new JoinLiO(), 
LongValue.class, 0, 0)
-                       .input1(filterO2, filterO1)
-                       .input2(lineitems)
-                       .name("JoinLiO")
-                       .build();
-               
-               FileDataSource partJoin1 = new FileDataSource(new 
CsvInputFormat(), partJoin1Path, "Part Join 1");
-               CsvInputFormat.configureRecordFormat(partJoin1)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(LongValue.class, 0)
-                       .field(IntValue.class, 1)
-                       .field(DoubleValue.class, 2);
-               
-               FileDataSource partJoin2 = new FileDataSource(new 
CsvInputFormat(), partJoin2Path, "Part Join 2");
-               CsvInputFormat.configureRecordFormat(partJoin2)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(LongValue.class, 0)
-                       .field(IntValue.class, 1)
-                       .field(DoubleValue.class, 2);
-               
-               // create ReduceOperator for aggregating the result
-               // the reducer has a composite key, consisting of the fields 0 
and 1
-               @SuppressWarnings("unchecked")
-               ReduceOperator aggLiO = ReduceOperator.builder(new AggLiO())
-                       .keyField(LongValue.class, 0)
-                       .keyField(StringValue.class, 1)
-                       .input(joinLiO, partJoin2, partJoin1)
-                       .name("AggLio")
-                       .build();
-
-               // create DataSinkContract for writing the result
-               FileDataSink result = new FileDataSink(new CsvOutputFormat(), 
output, aggLiO, "Output");
-               CsvOutputFormat.configureRecordFormat(result)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .lenient(true)
-                       .field(LongValue.class, 0)
-                       .field(IntValue.class, 1)
-                       .field(DoubleValue.class, 2);
-               
-               // assemble the PACT plan
-               Plan plan = new Plan(result, "TPCH Q3 Unioned");
-               plan.setDefaultParallelism(numSubtasks);
-               return plan;
-       }
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [numSubStasks], [orders1], [orders2], 
[partJoin1], [partJoin2], [lineitem], [output]";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
deleted file mode 100644
index ec3c5b4..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat;
-import org.apache.flink.test.recordJobs.util.Tuple;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of the TPC-H Query 4 as a Flink program.
- */
-
-@SuppressWarnings({"serial", "deprecation"})
-public class TPCHQuery4 implements Program, ProgramDescription {
-
-       private static Logger LOG = LoggerFactory.getLogger(TPCHQuery4.class);
-       
-       private int parallelism = 1;
-       private String ordersInputPath;
-       private String lineItemInputPath;
-       private String outputPath;
-       
-       
-       /**
-        * Small {@link MapFunction} to filer out the irrelevant orders.
-        *
-        */
-       //@SameKey
-       public static class OFilter extends MapFunction {
-
-               private final String dateParamString = "1995-01-01";
-               private final SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd");
-               private final GregorianCalendar gregCal = new 
GregorianCalendar();
-               
-               private Date paramDate;
-               private Date plusThreeMonths;
-               
-               @Override
-               public void open(Configuration parameters) {                    
        
-                       try {
-                               this.paramDate = 
sdf.parse(this.dateParamString);
-                               this.plusThreeMonths = 
getPlusThreeMonths(paramDate);
-                               
-                       } catch (ParseException e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception {
-                       Tuple tuple = record.getField(1, Tuple.class);
-                       Date orderDate;
-                       
-                       String orderStringDate = tuple.getStringValueAt(4);
-                       
-                       try {
-                               orderDate = sdf.parse(orderStringDate);
-                       } catch (ParseException e) {
-                               throw new RuntimeException(e);
-                       }
-                       
-                       if(paramDate.before(orderDate) && 
plusThreeMonths.after(orderDate)) {
-                               out.collect(record);
-                       }
-
-               }
-
-               /**
-                * Calculates the {@link Date} which is three months after the 
given one.
-                * @param paramDate of type {@link Date}.
-                * @return a {@link Date} three month later.
-                */
-               private Date getPlusThreeMonths(Date paramDate) {
-                       
-                       gregCal.setTime(paramDate);
-                       gregCal.add(Calendar.MONTH, 3);
-                       Date plusThreeMonths = gregCal.getTime();
-                       return plusThreeMonths;
-               }
-       }
-       
-       /**
-        * Simple filter for the line item selection. It filters all teh tuples 
that do
-        * not satisfy the &quot;l_commitdate &lt; l_receiptdate&quot; 
condition.
-        * 
-        */
-       //@SameKey
-       public static class LiFilter extends MapFunction {
-
-               private final SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd");
-               
-               @Override
-               public void map(Record record, Collector<Record> out) throws 
Exception {
-                       Tuple tuple = record.getField(1, Tuple.class);
-                       String commitString = tuple.getStringValueAt(11);
-                       String receiptString = tuple.getStringValueAt(12);
-
-                       Date commitDate;
-                       Date receiptDate;
-                       
-                       try {
-                               commitDate = sdf.parse(commitString);
-                               receiptDate = sdf.parse(receiptString);
-                       } catch (ParseException e) {
-                               throw new RuntimeException(e);
-                       }
-
-                       if (commitDate.before(receiptDate)) {
-                               out.collect(record);
-                       }
-
-               }
-       }
-       
-       /**
-        * Implements the equijoin on the orderkey and performs the projection 
on 
-        * the order priority as well.
-        *
-        */
-       public static class JoinLiO extends JoinFunction {
-               
-               @Override
-               public void join(Record order, Record line, Collector<Record> 
out)
-                               throws Exception {
-                       Tuple orderTuple = order.getField(1, Tuple.class);
-                       
-                       orderTuple.project(32);
-                       String newOrderKey = orderTuple.getStringValueAt(0);
-                       
-                       order.setField(0, new StringValue(newOrderKey));
-                       out.collect(order);
-               }
-       }
-       
-       /**
-        * Implements the count(*) part. 
-        *
-        */
-       //@SameKey
-       public static class CountAgg extends ReduceFunction {
-               
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {  
-                       long count = 0;
-                       Record rec = null;
-                       
-                       while(records.hasNext()) {
-                               rec = records.next();
-                               count++;
-                       }
-                       
-                       if(rec != null)
-                       {
-                               Tuple tuple = new Tuple();
-                               tuple.addAttribute("" + count);
-                               rec.setField(1, tuple);
-                       }
-                       
-                       out.collect(rec);
-               }
-       }
-       
-
-       @Override
-       public Plan getPlan(String... args) throws IllegalArgumentException {
-               
-               if(args == null || args.length != 4)
-               {
-                       LOG.warn("number of arguments do not match!");
-                       this.ordersInputPath = "";
-                       this.lineItemInputPath = "";
-                       this.outputPath = "";
-               }else
-               {
-                       setArgs(args);
-               }
-               
-               FileDataSource orders = 
-                       new FileDataSource(new IntTupleDataInFormat(), 
this.ordersInputPath, "Orders");
-               orders.setParallelism(this.parallelism);
-               //orders.setOutputContract(UniqueKey.class);
-               
-               FileDataSource lineItems =
-                       new FileDataSource(new IntTupleDataInFormat(), 
this.lineItemInputPath, "LineItems");
-               lineItems.setParallelism(this.parallelism);
-               
-               FileDataSink result = 
-                               new FileDataSink(new 
StringTupleDataOutFormat(), this.outputPath, "Output");
-               result.setParallelism(parallelism);
-               
-               MapOperator lineFilter = 
-                               MapOperator.builder(LiFilter.class)
-                       .name("LineItemFilter")
-                       .build();
-               lineFilter.setParallelism(parallelism);
-               
-               MapOperator ordersFilter = 
-                               MapOperator.builder(OFilter.class)
-                       .name("OrdersFilter")
-                       .build();
-               ordersFilter.setParallelism(parallelism);
-               
-               JoinOperator join = 
-                               JoinOperator.builder(JoinLiO.class, 
IntValue.class, 0, 0)
-                       .name("OrdersLineitemsJoin")
-                       .build();
-                       join.setParallelism(parallelism);
-               
-               ReduceOperator aggregation = 
-                               ReduceOperator.builder(CountAgg.class, 
StringValue.class, 0)
-                       .name("AggregateGroupBy")
-                       .build();
-               aggregation.setParallelism(this.parallelism);
-               
-               lineFilter.setInput(lineItems);
-               ordersFilter.setInput(orders);
-               join.setFirstInput(ordersFilter);
-               join.setSecondInput(lineFilter);
-               aggregation.setInput(join);
-               result.setInput(aggregation);
-               
-                       
-               return new Plan(result, "TPC-H 4");
-       }
-
-       /**
-        * Get the args into the members.
-        * @param args
-        */
-       private void setArgs(String[] args) {
-               this.parallelism = Integer.parseInt(args[0]);
-               this.ordersInputPath = args[1];
-               this.lineItemInputPath = args[2];
-               this.outputPath = args[3];
-       }
-
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [parallelism] [orders-input] 
[lineitem-input] [output]";
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
deleted file mode 100644
index c00d231..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.relational;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.relational.query9Util.AmountAggregate;
-import 
org.apache.flink.test.recordJobs.relational.query9Util.FilteredPartsJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.IntPair;
-import org.apache.flink.test.recordJobs.relational.query9Util.LineItemMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.OrderMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.OrderedPartsJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartFilter;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartListJoin;
-import org.apache.flink.test.recordJobs.relational.query9Util.PartsuppMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.StringIntPair;
-import 
org.apache.flink.test.recordJobs.relational.query9Util.StringIntPairStringDataOutFormat;
-import org.apache.flink.test.recordJobs.relational.query9Util.SupplierMap;
-import org.apache.flink.test.recordJobs.relational.query9Util.SuppliersJoin;
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.types.IntValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Quote from the TPC-H homepage:
- * "The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format."
- * This PACT program implements the query 9 of the TPC-H benchmark:
- * 
- * <pre>
- * select nation, o_year, sum(amount) as sum_profit
- * from (
- *   select n_name as nation, extract(year from o_orderdate) as o_year,
- *          l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as 
amount
- *   from part, supplier, lineitem, partsupp, orders, nation
- *   where
- *     s_suppkey = l_suppkey and ps_suppkey = l_suppkey and ps_partkey = 
l_partkey
- *     and p_partkey = l_partkey and o_orderkey = l_orderkey and s_nationkey = 
n_nationkey
- *     and p_name like '%[COLOR]%'
- * ) as profit
- * group by nation, o_year
- * order by nation, o_year desc;
- * </pre>
- * 
- * Plan:<br>
- * Match "part" and "partsupp" on "partkey" -> "parts" with (partkey, suppkey) 
as key
- * Match "orders" and "lineitem" on "orderkey" -> "ordered_parts" with 
(partkey, suppkey) as key
- * Match "parts" and "ordered_parts" on (partkey, suppkey) -> "filtered_parts" 
with "suppkey" as key
- * Match "supplier" and "nation" on "nationkey" -> "suppliers" with "suppkey" 
as key
- * Match "filtered_parts" and "suppliers" on" suppkey" -> "partlist" with 
(nation, o_year) as key
- * Group "partlist" by (nation, o_year), calculate sum(amount)
- * 
- * <b>Attention:</b> The "order by" part is not implemented!
- * 
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class TPCHQuery9 implements Program, ProgramDescription {
-       public final String ARGUMENTS = "parallelism partInputPath 
partSuppInputPath ordersInputPath lineItemInputPath supplierInputPath 
nationInputPath outputPath";
-
-       private static Logger LOG = LoggerFactory.getLogger(TPCHQuery9.class);
-
-       private int parallelism = 1;
-
-       private String partInputPath, partSuppInputPath, ordersInputPath, 
lineItemInputPath, supplierInputPath,
-                       nationInputPath;
-
-       private String outputPath;
-
-
-       @Override
-       public Plan getPlan(String... args) throws IllegalArgumentException {
-
-               if (args.length != 8)
-               {
-                       LOG.warn("number of arguments do not match!");
-                       
-                       this.parallelism = 1;
-                       this.partInputPath = "";
-                       this.partSuppInputPath = "";
-                       this.ordersInputPath = "";
-                       this.lineItemInputPath = "";
-                       this.supplierInputPath = "";
-                       this.nationInputPath = "";
-                       this.outputPath = "";
-               }else
-               {
-                       this.parallelism = Integer.parseInt(args[0]);
-                       this.partInputPath = args[1];
-                       this.partSuppInputPath = args[2];
-                       this.ordersInputPath = args[3];
-                       this.lineItemInputPath = args[4];
-                       this.supplierInputPath = args[5];
-                       this.nationInputPath = args[6];
-                       this.outputPath = args[7];
-               }
-               
-               /* Create the 6 data sources: */
-               /* part: (partkey | name, mfgr, brand, type, size, container, 
retailprice, comment) */
-               FileDataSource partInput = new FileDataSource(
-                       new IntTupleDataInFormat(), this.partInputPath, 
"\"part\" source");
-               //partInput.setOutputContract(UniqueKey.class);
-//             partInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-               /* partsupp: (partkey | suppkey, availqty, supplycost, comment) 
*/
-               FileDataSource partSuppInput = new FileDataSource(
-                       new IntTupleDataInFormat(), this.partSuppInputPath, 
"\"partsupp\" source");
-
-               /* orders: (orderkey | custkey, orderstatus, totalprice, 
orderdate, orderpriority, clerk, shippriority, comment) */
-               FileDataSource ordersInput = new FileDataSource(
-                       new IntTupleDataInFormat(), this.ordersInputPath, 
"\"orders\" source");
-               //ordersInput.setOutputContract(UniqueKey.class);
-//             ordersInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-               /* lineitem: (orderkey | partkey, suppkey, linenumber, 
quantity, extendedprice, discount, tax, ...) */
-               FileDataSource lineItemInput = new FileDataSource(
-                       new IntTupleDataInFormat(), this.lineItemInputPath, 
"\"lineitem\" source");
-
-               /* supplier: (suppkey | name, address, nationkey, phone, 
acctbal, comment) */
-               FileDataSource supplierInput = new FileDataSource(
-                       new IntTupleDataInFormat(), this.supplierInputPath, 
"\"supplier\" source");
-               //supplierInput.setOutputContract(UniqueKey.class);
-//             supplierInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-               /* nation: (nationkey | name, regionkey, comment) */
-               FileDataSource nationInput = new FileDataSource(
-                       new IntTupleDataInFormat(), this.nationInputPath, 
"\"nation\" source");
-               //nationInput.setOutputContract(UniqueKey.class);
-//             nationInput.getCompilerHints().setAvgNumValuesPerKey(1);
-
-               /* Filter on part's name, project values to NULL: */
-               MapOperator filterPart = MapOperator.builder(PartFilter.class)
-                       .name("filterParts")
-                       .build();
-
-               /* Map to change the key element of partsupp, project value to 
(supplycost, suppkey): */
-               MapOperator mapPartsupp = MapOperator.builder(PartsuppMap.class)
-                       .name("mapPartsupp")
-                       .build();
-
-               /* Map to extract the year from order: */
-               MapOperator mapOrder = MapOperator.builder(OrderMap.class)
-                       .name("mapOrder")
-                       .build();
-
-               /* Project value to (partkey, suppkey, quantity, price = 
extendedprice*(1-discount)): */
-               MapOperator mapLineItem = MapOperator.builder(LineItemMap.class)
-                       .name("proj.Partsupp")
-                       .build();
-
-               /* - change the key of supplier to nationkey, project value to 
suppkey */
-               MapOperator mapSupplier = MapOperator.builder(SupplierMap.class)
-                       .name("proj.Partsupp")
-                       .build();
-
-               /* Equijoin on partkey of part and partsupp: */
-               JoinOperator partsJoin = JoinOperator.builder(PartJoin.class, 
IntValue.class, 0, 0)
-                       .name("partsJoin")
-                       .build();
-
-               /* Equijoin on orderkey of orders and lineitem: */
-               JoinOperator orderedPartsJoin =
-                       JoinOperator.builder(OrderedPartsJoin.class, 
IntValue.class, 0, 0)
-                       .name("orderedPartsJoin")
-                       .build();
-
-               /* Equijoin on nationkey of supplier and nation: */
-               JoinOperator suppliersJoin =
-                       JoinOperator.builder(SuppliersJoin.class, 
IntValue.class, 0, 0)
-                       .name("suppliersJoin")
-                       .build();
-
-               /* Equijoin on (partkey,suppkey) of parts and orderedParts: */
-               JoinOperator filteredPartsJoin =
-                       JoinOperator.builder(FilteredPartsJoin.class, 
IntPair.class, 0, 0)
-                       .name("filteredPartsJoin")
-                       .build();
-
-               /* Equijoin on suppkey of filteredParts and suppliers: */
-               JoinOperator partListJoin =
-                       JoinOperator.builder(PartListJoin.class, IntValue.class 
, 0, 0)
-                       .name("partlistJoin")
-                       .build();
-
-               /* Aggregate sum(amount) by (nation,year): */
-               ReduceOperator sumAmountAggregate =
-                       ReduceOperator.builder(AmountAggregate.class, 
StringIntPair.class, 0)
-                       .name("groupyBy")
-                       .build();
-
-               /* Connect input filters: */
-               filterPart.setInput(partInput);
-               mapPartsupp.setInput(partSuppInput);
-               mapOrder.setInput(ordersInput);
-               mapLineItem.setInput(lineItemInput);
-               mapSupplier.setInput(supplierInput);
-
-               /* Connect equijoins: */
-               partsJoin.setFirstInput(filterPart);
-               partsJoin.setSecondInput(mapPartsupp);
-               orderedPartsJoin.setFirstInput(mapOrder);
-               orderedPartsJoin.setSecondInput(mapLineItem);
-               suppliersJoin.setFirstInput(mapSupplier);
-               suppliersJoin.setSecondInput(nationInput);
-               filteredPartsJoin.setFirstInput(partsJoin);
-               filteredPartsJoin.setSecondInput(orderedPartsJoin);
-               partListJoin.setFirstInput(filteredPartsJoin);
-               partListJoin.setSecondInput(suppliersJoin);
-
-               /* Connect aggregate: */
-               sumAmountAggregate.setInput(partListJoin);
-
-               /* Connect sink: */
-               FileDataSink result = new FileDataSink(new 
StringIntPairStringDataOutFormat(), this.outputPath, "Results sink");
-               result.setInput(sumAmountAggregate);
-
-               Plan p = new Plan(result, "TPC-H query 9");
-               p.setDefaultParallelism(this.parallelism);
-               return p;
-       }
-
-       @Override
-       public String getDescription() {
-               return "TPC-H query 9, parameters: " + this.ARGUMENTS;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
deleted file mode 100644
index a681f64..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.relational;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecondExcept;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-/**
- * The TPC-H is a decision support benchmark on relational data.
- * Its documentation and the data generator (DBGEN) can be found
- * on http://www.tpc.org/tpch/ .This implementation is tested with
- * the DB2 data format.  
- * 
- * This program implements a query on the TPC-H schema 
- * including one join and an aggregation.
- * This query is used as example in the Asterix project 
(http://asterix.ics.uci.edu/).
- * 
- * SELECT c_mktsegment, COUNT(o_orderkey)
- *   FROM orders, customer
- *   WHERE c_custkey = o_custkey
- * GROUP BY c_mktsegment;
- * 
- */
-@SuppressWarnings("deprecation")
-public class TPCHQueryAsterix implements Program, ProgramDescription {
-
-       private static final long serialVersionUID = 1L;
-
-
-       /**
-        * Realizes the join between Customers and Order table.
-        */
-       @ConstantFieldsSecondExcept(0)
-       public static class JoinCO extends JoinFunction implements Serializable 
{
-               private static final long serialVersionUID = 1L;
-
-               private final IntValue one = new IntValue(1);
-               
-               /**
-                * Output Schema:
-                *  0: PARTIAL_COUNT=1
-                *  1: C_MKTSEGMENT
-                */
-               @Override
-               public void join(Record order, Record cust, Collector<Record> 
out)
-                               throws Exception {
-                       cust.setField(0, one);
-                       out.collect(cust);
-               }
-       }
-
-       /**
-        * Reduce implements the aggregation of the results. The 
-        * Combinable annotation is set as the partial counts can be calculated
-        * already in the combiner
-        *
-        */
-       @Combinable
-       @ConstantFields(1)
-       public static class AggCO extends ReduceFunction implements 
Serializable {
-               private static final long serialVersionUID = 1L;
-
-               private final IntValue integer = new IntValue();
-               private Record record = new Record();
-       
-               /**
-                * Output Schema:
-                * 0: COUNT
-                * 1: C_MKTSEGMENT
-                *
-                */
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out)
-                               throws Exception {
-
-                       int count = 0;
-
-                       while (records.hasNext()) {
-                               record = records.next();
-                               count+=record.getField(0, integer).getValue();
-                       }
-
-                       integer.setValue(count);
-                       record.setField(0, integer);
-                       out.collect(record);
-               }
-               
-               /**
-                * Computes partial counts
-                */
-               public void combine(Iterator<Record> records, Collector<Record> 
out)
-                               throws Exception {
-                       reduce(records, out);
-               }
-
-       }
-
-
-       @Override
-       public Plan getPlan(final String... args) {
-
-               // parse program parameters
-               int numSubtasks       = (args.length > 0 ? 
Integer.parseInt(args[0]) : 1);
-               String ordersPath    = (args.length > 1 ? args[1] : "");
-               String customerPath  = (args.length > 2 ? args[2] : "");
-               String output        = (args.length > 3 ? args[3] : "");
-
-               /*
-                * Output Schema:
-                * 0: CUSTOMER_ID
-                */
-               // create DataSourceContract for Orders input
-               FileDataSource orders = new FileDataSource(new 
CsvInputFormat(), ordersPath, "Orders");
-               orders.setParallelism(numSubtasks);
-               CsvInputFormat.configureRecordFormat(orders)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(IntValue.class, 1);
-               
-               /*
-                * Output Schema:
-                * 0: CUSTOMER_ID
-                * 1: MKT_SEGMENT
-                */
-               // create DataSourceContract for Customer input
-               FileDataSource customers = new FileDataSource(new 
CsvInputFormat(), customerPath, "Customers");
-               customers.setParallelism(numSubtasks);
-               CsvInputFormat.configureRecordFormat(customers)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(IntValue.class, 0)
-                       .field(StringValue.class, 6);
-               
-               // create JoinOperator for joining Orders and LineItems
-               JoinOperator joinCO = JoinOperator.builder(new JoinCO(), 
IntValue.class, 0, 0)
-                       .name("JoinCO")
-                       .build();
-               joinCO.setParallelism(numSubtasks);
-
-               // create ReduceOperator for aggregating the result
-               ReduceOperator aggCO = ReduceOperator.builder(new AggCO(), 
StringValue.class, 1)
-                       .name("AggCo")
-                       .build();
-               aggCO.setParallelism(numSubtasks);
-
-               // create DataSinkContract for writing the result
-               FileDataSink result = new FileDataSink(new CsvOutputFormat(), 
output, "Output");
-               result.setParallelism(numSubtasks);
-               CsvOutputFormat.configureRecordFormat(result)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter('|')
-                       .field(IntValue.class, 0)
-                       .field(StringValue.class, 1);
-
-               // assemble the plan
-               result.setInput(aggCO);
-               aggCO.setInput(joinCO);
-               joinCO.setFirstInput(orders);
-               joinCO.setSecondInput(customers);
-
-               return new Plan(result, "TPCH Asterix");
-       }
-
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [numSubStasks], [orders], [customer], 
[output]";
-       }
-}

Reply via email to