http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java deleted file mode 100644 index c5dd8ec..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointInFormat.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.kmeans.udfs; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.record.io.DelimitedInputFormat; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; - -/** - * Generates records with an id and a and CoordVector. - * The input format is line-based, i.e. one record is read from one line - * which is terminated by '\n'. Within a line the first '|' character separates - * the id from the CoordVector. The vector consists of a vector of decimals. - * The decimals are separated by '|' as well. The id is the id of a data point or - * cluster center and the CoordVector the corresponding position (coordinate - * vector) of the data point or cluster center. Example line: - * "42|23.23|52.57|74.43| Id: 42 Coordinate vector: (23.23, 52.57, 74.43) - */ -public class PointInFormat extends DelimitedInputFormat { - private static final long serialVersionUID = 1L; - - private final IntValue idInteger = new IntValue(); - private final CoordVector point = new CoordVector(); - - private final List<Double> dimensionValues = new ArrayList<Double>(); - private double[] pointValues = new double[0]; - - @Override - public Record readRecord(Record record, byte[] line, int offset, int numBytes) { - - final int limit = offset + numBytes; - - int id = -1; - int value = 0; - int fractionValue = 0; - int fractionChars = 0; - boolean negative = false; - - this.dimensionValues.clear(); - - for (int pos = offset; pos < limit; pos++) { - if (line[pos] == '|') { - // check if id was already set - if (id == -1) { - id = value; - } - else { - double v = value + ((double) fractionValue) * Math.pow(10, (-1 * (fractionChars - 1))); - this.dimensionValues.add(negative ? -v : v); - } - // reset value - value = 0; - fractionValue = 0; - fractionChars = 0; - negative = false; - } else if (line[pos] == '.') { - fractionChars = 1; - } else if (line[pos] == '-') { - negative = true; - } else { - if (fractionChars == 0) { - value *= 10; - value += line[pos] - '0'; - } else { - fractionValue *= 10; - fractionValue += line[pos] - '0'; - fractionChars++; - } - } - } - - // set the ID - this.idInteger.setValue(id); - record.setField(0, this.idInteger); - - // set the data points - if (this.pointValues.length != this.dimensionValues.size()) { - this.pointValues = new double[this.dimensionValues.size()]; - } - for (int i = 0; i < this.pointValues.length; i++) { - this.pointValues[i] = this.dimensionValues.get(i); - } - - this.point.setCoordinates(this.pointValues); - record.setField(1, this.point); - return record; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java deleted file mode 100644 index 410397e..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/PointOutFormat.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.kmeans.udfs; - -import java.text.DecimalFormat; -import java.text.DecimalFormatSymbols; - -import org.apache.flink.api.java.record.io.DelimitedOutputFormat; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; - -/** - * Writes records that contain an id and a CoordVector. - * The output format is line-based, i.e. one record is written to - * a line and terminated with '\n'. Within a line the first '|' character - * separates the id from the CoordVector. The vector consists of a vector of - * decimals. The decimals are separated by '|'. The is is the id of a data - * point or cluster center and the vector the corresponding position - * (coordinate vector) of the data point or cluster center. Example line: - * "42|23.23|52.57|74.43| Id: 42 Coordinate vector: (23.23, 52.57, 74.43) - */ -public class PointOutFormat extends DelimitedOutputFormat { - private static final long serialVersionUID = 1L; - - private final DecimalFormat df = new DecimalFormat("####0.00"); - private final StringBuilder line = new StringBuilder(); - - - public PointOutFormat() { - DecimalFormatSymbols dfSymbols = new DecimalFormatSymbols(); - dfSymbols.setDecimalSeparator('.'); - this.df.setDecimalFormatSymbols(dfSymbols); - } - - @Override - public int serializeRecord(Record record, byte[] target) { - - line.setLength(0); - - IntValue centerId = record.getField(0, IntValue.class); - CoordVector centerPos = record.getField(1, CoordVector.class); - - - line.append(centerId.getValue()); - - for (double coord : centerPos.getCoordinates()) { - line.append('|'); - line.append(df.format(coord)); - } - line.append('|'); - - byte[] byteString = line.toString().getBytes(); - - if (byteString.length <= target.length) { - System.arraycopy(byteString, 0, target, 0, byteString.length); - return byteString.length; - } - else { - return -byteString.length; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java deleted file mode 100644 index 89e222b..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/RecomputeClusterCenter.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.kmeans.udfs; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -/** - * Reduce PACT computes the new position (coordinate vector) of a cluster - * center. This is an average computation. Hence, Combinable is annotated - * and the combine method implemented. - * - * Output Format: - * 0: clusterID - * 1: clusterVector - */ -@SuppressWarnings("deprecation") -@Combinable -@ConstantFields(0) -public class RecomputeClusterCenter extends ReduceFunction implements Serializable { - private static final long serialVersionUID = 1L; - - private final IntValue count = new IntValue(); - - /** - * Compute the new position (coordinate vector) of a cluster center. - */ - @Override - public void reduce(Iterator<Record> dataPoints, Collector<Record> out) { - Record next = null; - - // initialize coordinate vector sum and count - CoordVector coordinates = new CoordVector(); - double[] coordinateSum = null; - int count = 0; - - // compute coordinate vector sum and count - while (dataPoints.hasNext()) { - next = dataPoints.next(); - - // get the coordinates and the count from the record - double[] thisCoords = next.getField(1, CoordVector.class).getCoordinates(); - int thisCount = next.getField(2, IntValue.class).getValue(); - - if (coordinateSum == null) { - if (coordinates.getCoordinates() != null) { - coordinateSum = coordinates.getCoordinates(); - } - else { - coordinateSum = new double[thisCoords.length]; - } - } - - addToCoordVector(coordinateSum, thisCoords); - count += thisCount; - } - - // compute new coordinate vector (position) of cluster center - for (int i = 0; i < coordinateSum.length; i++) { - coordinateSum[i] /= count; - } - - coordinates.setCoordinates(coordinateSum); - next.setField(1, coordinates); - next.setNull(2); - - // emit new position of cluster center - out.collect(next); - } - - /** - * Computes a pre-aggregated average value of a coordinate vector. - */ - @Override - public void combine(Iterator<Record> dataPoints, Collector<Record> out) { - - Record next = null; - - // initialize coordinate vector sum and count - CoordVector coordinates = new CoordVector(); - double[] coordinateSum = null; - int count = 0; - - // compute coordinate vector sum and count - while (dataPoints.hasNext()) { - next = dataPoints.next(); - - // get the coordinates and the count from the record - double[] thisCoords = next.getField(1, CoordVector.class).getCoordinates(); - int thisCount = next.getField(2, IntValue.class).getValue(); - - if (coordinateSum == null) { - if (coordinates.getCoordinates() != null) { - coordinateSum = coordinates.getCoordinates(); - } - else { - coordinateSum = new double[thisCoords.length]; - } - } - - addToCoordVector(coordinateSum, thisCoords); - count += thisCount; - } - - coordinates.setCoordinates(coordinateSum); - this.count.setValue(count); - next.setField(1, coordinates); - next.setField(2, this.count); - - // emit partial sum and partial count for average computation - out.collect(next); - } - - /** - * Adds two coordinate vectors by summing up each of their coordinates. - * - * @param cvToAddTo - * The coordinate vector to which the other vector is added. - * This vector is returned. - * @param cvToBeAdded - * The coordinate vector which is added to the other vector. - * This vector is not modified. - */ - private void addToCoordVector(double[] cvToAddTo, double[] cvToBeAdded) { - // check if both vectors have same length - if (cvToAddTo.length != cvToBeAdded.length) { - throw new IllegalArgumentException("The given coordinate vectors are not of equal length."); - } - - // sum coordinate vectors coordinate-wise - for (int i = 0; i < cvToAddTo.length; i++) { - cvToAddTo[i] += cvToBeAdded[i]; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java deleted file mode 100644 index b948804..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/MergeOnlyJoin.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.relational; - -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept; -import org.apache.flink.api.java.record.io.CsvInputFormat; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -public class MergeOnlyJoin implements Program { - - private static final long serialVersionUID = 1L; - - @ConstantFieldsFirstExcept(2) - public static class JoinInputs extends JoinFunction { - private static final long serialVersionUID = 1L; - - @Override - public void join(Record input1, Record input2, Collector<Record> out) { - input1.setField(2, input2.getField(1, IntValue.class)); - out.collect(input1); - } - } - - @ConstantFieldsExcept({}) - public static class DummyReduce extends ReduceFunction { - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator<Record> values, Collector<Record> out) { - while (values.hasNext()) { - out.collect(values.next()); - } - } - } - - - @Override - public Plan getPlan(final String... args) { - // parse program parameters - int numSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String input1Path = (args.length > 1 ? args[1] : ""); - String input2Path = (args.length > 2 ? args[2] : ""); - String output = (args.length > 3 ? args[3] : ""); - int numSubtasksInput2 = (args.length > 4 ? Integer.parseInt(args[4]) : 1); - - // create DataSourceContract for Orders input - @SuppressWarnings("unchecked") - CsvInputFormat format1 = new CsvInputFormat('|', IntValue.class, IntValue.class); - FileDataSource input1 = new FileDataSource(format1, input1Path, "Input 1"); - - ReduceOperator aggInput1 = ReduceOperator.builder(DummyReduce.class, IntValue.class, 0) - .input(input1) - .name("AggOrders") - .build(); - - - // create DataSourceContract for Orders input - @SuppressWarnings("unchecked") - CsvInputFormat format2 = new CsvInputFormat('|', IntValue.class, IntValue.class); - FileDataSource input2 = new FileDataSource(format2, input2Path, "Input 2"); - input2.setParallelism(numSubtasksInput2); - - ReduceOperator aggInput2 = ReduceOperator.builder(DummyReduce.class, IntValue.class, 0) - .input(input2) - .name("AggLines") - .build(); - aggInput2.setParallelism(numSubtasksInput2); - - // create JoinOperator for joining Orders and LineItems - JoinOperator joinLiO = JoinOperator.builder(JoinInputs.class, IntValue.class, 0, 0) - .input1(aggInput1) - .input2(aggInput2) - .name("JoinLiO") - .build(); - - // create DataSinkContract for writing the result - FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, joinLiO, "Output"); - CsvOutputFormat.configureRecordFormat(result) - .recordDelimiter('\n') - .fieldDelimiter('|') - .lenient(true) - .field(IntValue.class, 0) - .field(IntValue.class, 1) - .field(IntValue.class, 2); - - // assemble the PACT plan - Plan plan = new Plan(result, "Merge Only Join"); - plan.setDefaultParallelism(numSubtasks); - return plan; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java deleted file mode 100644 index d805b92..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery1.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.relational; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.test.recordJobs.relational.query1Util.GroupByReturnFlag; -import org.apache.flink.test.recordJobs.relational.query1Util.LineItemFilter; -import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat; -import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat; -import org.apache.flink.types.StringValue; - -@SuppressWarnings("deprecation") -public class TPCHQuery1 implements Program, ProgramDescription { - - private static final long serialVersionUID = 1L; - - private int parallelism = 1; - private String lineItemInputPath; - private String outputPath; - - @Override - public Plan getPlan(String... args) throws IllegalArgumentException { - - - if (args.length != 3) { - this.parallelism = 1; - this.lineItemInputPath = ""; - this.outputPath = ""; - } else { - this.parallelism = Integer.parseInt(args[0]); - this.lineItemInputPath = args[1]; - this.outputPath = args[2]; - } - - FileDataSource lineItems = - new FileDataSource(new IntTupleDataInFormat(), this.lineItemInputPath, "LineItems"); - lineItems.setParallelism(this.parallelism); - - FileDataSink result = - new FileDataSink(new StringTupleDataOutFormat(), this.outputPath, "Output"); - result.setParallelism(this.parallelism); - - MapOperator lineItemFilter = - MapOperator.builder(new LineItemFilter()) - .name("LineItem Filter") - .build(); - lineItemFilter.setParallelism(this.parallelism); - - ReduceOperator groupByReturnFlag = - ReduceOperator.builder(new GroupByReturnFlag(), StringValue.class, 0) - .name("groupyBy") - .build(); - - lineItemFilter.setInput(lineItems); - groupByReturnFlag.setInput(lineItemFilter); - result.setInput(groupByReturnFlag); - - return new Plan(result, "TPC-H 1"); - } - - @Override - public String getDescription() { - return "Parameters: [parallelism] [lineitem-input] [output]"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java deleted file mode 100644 index 4bb0cdf..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery10.java +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.relational; - -import java.io.IOException; -import java.text.DecimalFormat; -import java.text.DecimalFormatSymbols; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.io.FileOutputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat; -import org.apache.flink.test.recordJobs.util.Tuple; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; - -@SuppressWarnings({"serial", "deprecation"}) -public class TPCHQuery10 implements Program, ProgramDescription { - - // -------------------------------------------------------------------------------------------- - // Local Filters and Projections - // -------------------------------------------------------------------------------------------- - - /** - * Forwards (0 = orderkey, 1 = custkey). - */ - public static class FilterO extends MapFunction - { - private static final int YEAR_FILTER = 1990; - - private final IntValue custKey = new IntValue(); - - @Override - public void map(Record record, Collector<Record> out) throws Exception { - - Tuple t = record.getField(1, Tuple.class); - if (Integer.parseInt(t.getStringValueAt(4).substring(0, 4)) > FilterO.YEAR_FILTER) { - // project - this.custKey.setValue((int) t.getLongValueAt(1)); - record.setField(1, this.custKey); - out.collect(record); - } - - } - } - - /** - * Forwards (0 = lineitem, 1 = tuple (extendedprice, discount) ) - */ - public static class FilterLI extends MapFunction - { - private final Tuple tuple = new Tuple(); - - @Override - public void map(Record record, Collector<Record> out) throws Exception - { - Tuple t = record.getField(1, this.tuple); - if (t.getStringValueAt(8).equals("R")) { - t.project(0x60); // l_extendedprice, l_discount - record.setField(1, t); - out.collect(record); - } - } - } - - /** - * Returns (0 = custkey, 1 = custName, 2 = NULL, 3 = balance, 4 = nationkey, 5 = address, 6 = phone, 7 = comment) - */ - public static class ProjectC extends MapFunction { - - private final Tuple tuple = new Tuple(); - - private final StringValue custName = new StringValue(); - - private final StringValue balance = new StringValue(); - private final IntValue nationKey = new IntValue(); - private final StringValue address = new StringValue(); - private final StringValue phone = new StringValue(); - private final StringValue comment = new StringValue(); - - @Override - public void map(Record record, Collector<Record> out) throws Exception - { - final Tuple t = record.getField(1, this.tuple); - - this.custName.setValue(t.getStringValueAt(1)); - this.address.setValue(t.getStringValueAt(2)); - this.nationKey.setValue((int) t.getLongValueAt(3)); - this.phone.setValue(t.getStringValueAt(4)); - this.balance.setValue(t.getStringValueAt(5)); - this.comment.setValue(t.getStringValueAt(7)); - - record.setField(1, this.custName); - record.setField(3, this.balance); - record.setField(4, this.nationKey); - record.setField(5, this.address); - record.setField(6, this.phone); - record.setField(7, this.comment); - - out.collect(record); - } - } - - /** - * Returns (0 = nationkey, 1 = nation_name) - */ - public static class ProjectN extends MapFunction - { - private final Tuple tuple = new Tuple(); - private final StringValue nationName = new StringValue(); - - @Override - public void map(Record record, Collector<Record> out) throws Exception - { - final Tuple t = record.getField(1, this.tuple); - - this.nationName.setValue(t.getStringValueAt(1)); - record.setField(1, this.nationName); - out.collect(record); - } - } - - // -------------------------------------------------------------------------------------------- - // Joins - // -------------------------------------------------------------------------------------------- - - /** - * Returns (0 = custKey, 1 = tuple (extendedprice, discount) ) - */ - public static class JoinOL extends JoinFunction - { - @Override - public void join(Record order, Record lineitem, Collector<Record> out) throws Exception { - lineitem.setField(0, order.getField(1, IntValue.class)); - out.collect(lineitem); - } - } - - /** - * Returns (0 = custkey, 1 = custName, 2 = extPrice * (1-discount), 3 = balance, 4 = nationkey, 5 = address, 6 = phone, 7 = comment) - */ - public static class JoinCOL extends JoinFunction - { - private final DoubleValue d = new DoubleValue(); - - @Override - public void join(Record custRecord, Record olRecord, Collector<Record> out) throws Exception - { - final Tuple t = olRecord.getField(1, Tuple.class); - final double extPrice = Double.parseDouble(t.getStringValueAt(0)); - final double discount = Double.parseDouble(t.getStringValueAt(1)); - - this.d.setValue(extPrice * (1 - discount)); - custRecord.setField(2, this.d); - out.collect(custRecord); - } - - } - - /** - * Returns (0 = custkey, 1 = custName, 2 = extPrice * (1-discount), 3 = balance, 4 = nationName, 5 = address, 6 = phone, 7 = comment) - */ - public static class JoinNCOL extends JoinFunction - { - @Override - public void join(Record colRecord, Record nation, Collector<Record> out) throws Exception { - colRecord.setField(4, nation.getField(1, StringValue.class)); - out.collect(colRecord); - } - } - - @ReduceOperator.Combinable - public static class Sum extends ReduceFunction - { - private final DoubleValue d = new DoubleValue(); - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception - { - Record record = null; - double sum = 0; - while (records.hasNext()) { - record = records.next(); - sum += record.getField(2, DoubleValue.class).getValue(); - } - - this.d.setValue(sum); - record.setField(2, this.d); - out.collect(record); - } - - @Override - public void combine(Iterator<Record> records, Collector<Record> out) throws Exception { - reduce(records,out); - } - } - - public static class TupleOutputFormat extends FileOutputFormat { - private static final long serialVersionUID = 1L; - - private final DecimalFormat formatter; - private final StringBuilder buffer = new StringBuilder(); - - public TupleOutputFormat() { - DecimalFormatSymbols decimalFormatSymbol = new DecimalFormatSymbols(); - decimalFormatSymbol.setDecimalSeparator('.'); - - this.formatter = new DecimalFormat("#.####"); - this.formatter.setDecimalFormatSymbols(decimalFormatSymbol); - } - - @Override - public void writeRecord(Record record) throws IOException - { - this.buffer.setLength(0); - this.buffer.append(record.getField(0, IntValue.class).toString()).append('|'); - this.buffer.append(record.getField(1, StringValue.class).toString()).append('|'); - - this.buffer.append(this.formatter.format(record.getField(2, DoubleValue.class).getValue())).append('|'); - - this.buffer.append(record.getField(3, StringValue.class).toString()).append('|'); - this.buffer.append(record.getField(4, StringValue.class).toString()).append('|'); - this.buffer.append(record.getField(5, StringValue.class).toString()).append('|'); - this.buffer.append(record.getField(6, StringValue.class).toString()).append('|'); - this.buffer.append(record.getField(7, StringValue.class).toString()).append('|'); - - this.buffer.append('\n'); - - final byte[] bytes = this.buffer.toString().getBytes(); - this.stream.write(bytes); - } - } - - @Override - public String getDescription() { - return "TPC-H Query 10"; - } - - @Override - public Plan getPlan(String... args) throws IllegalArgumentException { - final String ordersPath; - final String lineitemsPath; - final String customersPath; - final String nationsPath; - final String resultPath; - - final int parallelism; - - if (args.length < 6) { - throw new IllegalArgumentException("Invalid number of parameters"); - } else { - parallelism = Integer.parseInt(args[0]); - ordersPath = args[1]; - lineitemsPath = args[2]; - customersPath = args[3]; - nationsPath = args[4]; - resultPath = args[5]; - } - - FileDataSource orders = new FileDataSource(new IntTupleDataInFormat(), ordersPath, "Orders"); - // orders.setOutputContract(UniqueKey.class); - // orders.getCompilerHints().setAvgNumValuesPerKey(1); - - FileDataSource lineitems = new FileDataSource(new IntTupleDataInFormat(), lineitemsPath, "LineItems"); - // lineitems.getCompilerHints().setAvgNumValuesPerKey(4); - - FileDataSource customers = new FileDataSource(new IntTupleDataInFormat(), customersPath, "Customers"); - - FileDataSource nations = new FileDataSource(new IntTupleDataInFormat(), nationsPath, "Nations"); - - - MapOperator mapO = MapOperator.builder(FilterO.class) - .name("FilterO") - .build(); - - MapOperator mapLi = MapOperator.builder(FilterLI.class) - .name("FilterLi") - .build(); - - MapOperator projectC = MapOperator.builder(ProjectC.class) - .name("ProjectC") - .build(); - - MapOperator projectN = MapOperator.builder(ProjectN.class) - .name("ProjectN") - .build(); - - JoinOperator joinOL = JoinOperator.builder(JoinOL.class, IntValue.class, 0, 0) - .name("JoinOL") - .build(); - - JoinOperator joinCOL = JoinOperator.builder(JoinCOL.class, IntValue.class, 0, 0) - .name("JoinCOL") - .build(); - - JoinOperator joinNCOL = JoinOperator.builder(JoinNCOL.class, IntValue.class, 4, 0) - .name("JoinNCOL") - .build(); - - ReduceOperator reduce = ReduceOperator.builder(Sum.class) - .keyField(IntValue.class, 0) - .keyField(StringValue.class, 1) - .keyField(StringValue.class, 3) - .keyField(StringValue.class, 4) - .keyField(StringValue.class, 5) - .keyField(StringValue.class, 6) - .keyField(StringValue.class, 7) - .name("Reduce") - .build(); - - FileDataSink result = new FileDataSink(new TupleOutputFormat(), resultPath, "Output"); - - result.setInput(reduce); - - reduce.setInput(joinNCOL); - - joinNCOL.setFirstInput(joinCOL); - joinNCOL.setSecondInput(projectN); - - joinCOL.setFirstInput(projectC); - joinCOL.setSecondInput(joinOL); - - joinOL.setFirstInput(mapO); - joinOL.setSecondInput(mapLi); - - projectC.setInput(customers); - projectN.setInput(nations); - mapLi.setInput(lineitems); - mapO.setInput(orders); - - // return the PACT plan - Plan p = new Plan(result, "TPCH Q10"); - p.setDefaultParallelism(parallelism); - return p; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java deleted file mode 100644 index cebe6f9..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.relational; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst; -import org.apache.flink.api.java.record.io.CsvInputFormat; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; - -/** - * The TPC-H is a decision support benchmark on relational data. - * Its documentation and the data generator (DBGEN) can be found - * on http://www.tpc.org/tpch/ .This implementation is tested with - * the DB2 data format. - * - * This program implements a modified version of the query 3 of - * the TPC-H benchmark including one join, some filtering and an - * aggregation. - * - * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue - * FROM orders, lineitem - * WHERE l_orderkey = o_orderkey - * AND o_orderstatus = "X" - * AND YEAR(o_orderdate) > Y - * AND o_orderpriority LIKE "Z%" - * GROUP BY l_orderkey, o_shippriority; - */ -@SuppressWarnings("deprecation") -public class TPCHQuery3 implements Program, ProgramDescription { - - private static final long serialVersionUID = 1L; - - public static final String YEAR_FILTER = "parameter.YEAR_FILTER"; - public static final String PRIO_FILTER = "parameter.PRIO_FILTER"; - - /** - * Map PACT implements the selection and projection on the orders table. - */ - @ConstantFields({0,1}) - public static class FilterO extends MapFunction implements Serializable { - private static final long serialVersionUID = 1L; - - private String prioFilter; // filter literal for the order priority - private int yearFilter; // filter literal for the year - - // reusable objects for the fields touched in the mapper - private StringValue orderStatus; - private StringValue orderDate; - private StringValue orderPrio; - - /** - * Reads the filter literals from the configuration. - * - * @see org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration) - */ - @Override - public void open(Configuration parameters) { - this.yearFilter = parameters.getInteger(YEAR_FILTER, 1990); - this.prioFilter = parameters.getString(PRIO_FILTER, "0"); - } - - /** - * Filters the orders table by year, order status and order priority. - * - * o_orderstatus = "X" - * AND YEAR(o_orderdate) > Y - * AND o_orderpriority LIKE "Z" - * - * Output Schema: - * 0:ORDERKEY, - * 1:SHIPPRIORITY - */ - @Override - public void map(final Record record, final Collector<Record> out) { - orderStatus = record.getField(2, StringValue.class); - if (!orderStatus.getValue().equals("F")) { - return; - } - - orderPrio = record.getField(4, StringValue.class); - if(!orderPrio.getValue().startsWith(this.prioFilter)) { - return; - } - - orderDate = record.getField(3, StringValue.class); - if (!(Integer.parseInt(orderDate.getValue().substring(0, 4)) > this.yearFilter)) { - return; - } - - record.setNumFields(2); - out.collect(record); - } - } - - /** - * Match PACT realizes the join between LineItem and Order table. - * - */ - @ConstantFieldsFirst({0,1}) - public static class JoinLiO extends JoinFunction implements Serializable { - private static final long serialVersionUID = 1L; - - /** - * Implements the join between LineItem and Order table on the order key. - * - * Output Schema: - * 0:ORDERKEY - * 1:SHIPPRIORITY - * 2:EXTENDEDPRICE - */ - @Override - public void join(Record order, Record lineitem, Collector<Record> out) { - order.setField(2, lineitem.getField(1, DoubleValue.class)); - out.collect(order); - } - } - - /** - * Reduce PACT implements the sum aggregation. - * The Combinable annotation is set as the partial sums can be calculated - * already in the combiner - * - */ - @Combinable - @ConstantFields({0,1}) - public static class AggLiO extends ReduceFunction implements Serializable { - private static final long serialVersionUID = 1L; - - private final DoubleValue extendedPrice = new DoubleValue(); - - /** - * Implements the sum aggregation. - * - * Output Schema: - * 0:ORDERKEY - * 1:SHIPPRIORITY - * 2:SUM(EXTENDEDPRICE) - */ - @Override - public void reduce(Iterator<Record> values, Collector<Record> out) { - Record rec = null; - double partExtendedPriceSum = 0; - - while (values.hasNext()) { - rec = values.next(); - partExtendedPriceSum += rec.getField(2, DoubleValue.class).getValue(); - } - - this.extendedPrice.setValue(partExtendedPriceSum); - rec.setField(2, this.extendedPrice); - out.collect(rec); - } - - /** - * Creates partial sums on the price attribute for each data batch. - */ - @Override - public void combine(Iterator<Record> values, Collector<Record> out) { - reduce(values, out); - } - } - - - @Override - public Plan getPlan(final String... args) { - // parse program parameters - final int numSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - final String ordersPath = (args.length > 1 ? args[1] : ""); - final String lineitemsPath = (args.length > 2 ? args[2] : ""); - final String output = (args.length > 3 ? args[3] : ""); - - // create DataSourceContract for Orders input - FileDataSource orders = new FileDataSource(new CsvInputFormat(), ordersPath, "Orders"); - CsvInputFormat.configureRecordFormat(orders) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(LongValue.class, 0) // order id - .field(IntValue.class, 7) // ship prio - .field(StringValue.class, 2, 2) // order status - .field(StringValue.class, 4, 10) // order date - .field(StringValue.class, 5, 8); // order prio - - // create DataSourceContract for LineItems input - FileDataSource lineitems = new FileDataSource(new CsvInputFormat(), lineitemsPath, "LineItems"); - CsvInputFormat.configureRecordFormat(lineitems) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(LongValue.class, 0) // order id - .field(DoubleValue.class, 5); // extended price - - // create MapOperator for filtering Orders tuples - MapOperator filterO = MapOperator.builder(new FilterO()) - .input(orders) - .name("FilterO") - .build(); - // filter configuration - filterO.setParameter(YEAR_FILTER, 1993); - filterO.setParameter(PRIO_FILTER, "5"); - // compiler hints - filterO.getCompilerHints().setFilterFactor(0.05f); - - // create JoinOperator for joining Orders and LineItems - JoinOperator joinLiO = JoinOperator.builder(new JoinLiO(), LongValue.class, 0, 0) - .input1(filterO) - .input2(lineitems) - .name("JoinLiO") - .build(); - - // create ReduceOperator for aggregating the result - // the reducer has a composite key, consisting of the fields 0 and 1 - ReduceOperator aggLiO = ReduceOperator.builder(new AggLiO()) - .keyField(LongValue.class, 0) - .keyField(StringValue.class, 1) - .input(joinLiO) - .name("AggLio") - .build(); - - // create DataSinkContract for writing the result - FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, aggLiO, "Output"); - CsvOutputFormat.configureRecordFormat(result) - .recordDelimiter('\n') - .fieldDelimiter('|') - .lenient(true) - .field(LongValue.class, 0) - .field(IntValue.class, 1) - .field(DoubleValue.class, 2); - - // assemble the PACT plan - Plan plan = new Plan(result, "TPCH Q3"); - plan.setDefaultParallelism(numSubtasks); - return plan; - } - - - @Override - public String getDescription() { - return "Parameters: [numSubStasks], [orders], [lineitem], [output]"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java deleted file mode 100644 index 157e3cf..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3Unioned.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.relational; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.io.CsvInputFormat; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.test.recordJobs.relational.TPCHQuery3.AggLiO; -import org.apache.flink.test.recordJobs.relational.TPCHQuery3.FilterO; -import org.apache.flink.test.recordJobs.relational.TPCHQuery3.JoinLiO; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.StringValue; - -/** - * The TPC-H is a decision support benchmark on relational data. - * Its documentation and the data generator (DBGEN) can be found - * on http://www.tpc.org/tpch/ .This implementation is tested with - * the DB2 data format. - * THe PACT program implements a modified version of the query 3 of - * the TPC-H benchmark including one join, some filtering and an - * aggregation. - * - * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue - * FROM orders, lineitem - * WHERE l_orderkey = o_orderkey - * AND o_orderstatus = "X" - * AND YEAR(o_orderdate) > Y - * AND o_orderpriority LIKE "Z%" - * GROUP BY l_orderkey, o_shippriority; - */ -@SuppressWarnings("deprecation") -public class TPCHQuery3Unioned implements Program, ProgramDescription { - - private static final long serialVersionUID = 1L; - - @Override - public Plan getPlan(final String... args) { - // parse program parameters - final int numSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String orders1Path = (args.length > 1 ? args[1] : ""); - String orders2Path = (args.length > 2 ? args[2] : ""); - String partJoin1Path = (args.length > 3 ? args[3] : ""); - String partJoin2Path = (args.length > 4 ? args[4] : ""); - - String lineitemsPath = (args.length > 5 ? args[5] : ""); - String output = (args.length > 6 ? args[6] : ""); - - // create DataSourceContract for Orders input - FileDataSource orders1 = new FileDataSource(new CsvInputFormat(), orders1Path, "Orders 1"); - CsvInputFormat.configureRecordFormat(orders1) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(LongValue.class, 0) // order id - .field(IntValue.class, 7) // ship prio - .field(StringValue.class, 2, 2) // order status - .field(StringValue.class, 4, 10) // order date - .field(StringValue.class, 5, 8); // order prio - - FileDataSource orders2 = new FileDataSource(new CsvInputFormat(), orders2Path, "Orders 2"); - CsvInputFormat.configureRecordFormat(orders2) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(LongValue.class, 0) // order id - .field(IntValue.class, 7) // ship prio - .field(StringValue.class, 2, 2) // order status - .field(StringValue.class, 4, 10) // order date - .field(StringValue.class, 5, 8); // order prio - - // create DataSourceContract for LineItems input - FileDataSource lineitems = new FileDataSource(new CsvInputFormat(), lineitemsPath, "LineItems"); - CsvInputFormat.configureRecordFormat(lineitems) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(LongValue.class, 0) - .field(DoubleValue.class, 5); - - // create MapOperator for filtering Orders tuples - MapOperator filterO1 = MapOperator.builder(new FilterO()) - .name("FilterO") - .input(orders1) - .build(); - // filter configuration - filterO1.setParameter(TPCHQuery3.YEAR_FILTER, 1993); - filterO1.setParameter(TPCHQuery3.PRIO_FILTER, "5"); - filterO1.getCompilerHints().setFilterFactor(0.05f); - - // create MapOperator for filtering Orders tuples - MapOperator filterO2 = MapOperator.builder(new FilterO()) - .name("FilterO") - .input(orders2) - .build(); - // filter configuration - filterO2.setParameter(TPCHQuery3.YEAR_FILTER, 1993); - filterO2.setParameter(TPCHQuery3.PRIO_FILTER, "5"); - - // create JoinOperator for joining Orders and LineItems - @SuppressWarnings("unchecked") - JoinOperator joinLiO = JoinOperator.builder(new JoinLiO(), LongValue.class, 0, 0) - .input1(filterO2, filterO1) - .input2(lineitems) - .name("JoinLiO") - .build(); - - FileDataSource partJoin1 = new FileDataSource(new CsvInputFormat(), partJoin1Path, "Part Join 1"); - CsvInputFormat.configureRecordFormat(partJoin1) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(LongValue.class, 0) - .field(IntValue.class, 1) - .field(DoubleValue.class, 2); - - FileDataSource partJoin2 = new FileDataSource(new CsvInputFormat(), partJoin2Path, "Part Join 2"); - CsvInputFormat.configureRecordFormat(partJoin2) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(LongValue.class, 0) - .field(IntValue.class, 1) - .field(DoubleValue.class, 2); - - // create ReduceOperator for aggregating the result - // the reducer has a composite key, consisting of the fields 0 and 1 - @SuppressWarnings("unchecked") - ReduceOperator aggLiO = ReduceOperator.builder(new AggLiO()) - .keyField(LongValue.class, 0) - .keyField(StringValue.class, 1) - .input(joinLiO, partJoin2, partJoin1) - .name("AggLio") - .build(); - - // create DataSinkContract for writing the result - FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, aggLiO, "Output"); - CsvOutputFormat.configureRecordFormat(result) - .recordDelimiter('\n') - .fieldDelimiter('|') - .lenient(true) - .field(LongValue.class, 0) - .field(IntValue.class, 1) - .field(DoubleValue.class, 2); - - // assemble the PACT plan - Plan plan = new Plan(result, "TPCH Q3 Unioned"); - plan.setDefaultParallelism(numSubtasks); - return plan; - } - - @Override - public String getDescription() { - return "Parameters: [numSubStasks], [orders1], [orders2], [partJoin1], [partJoin2], [lineitem], [output]"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java deleted file mode 100644 index ec3c5b4..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery4.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.relational; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat; -import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat; -import org.apache.flink.test.recordJobs.util.Tuple; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of the TPC-H Query 4 as a Flink program. - */ - -@SuppressWarnings({"serial", "deprecation"}) -public class TPCHQuery4 implements Program, ProgramDescription { - - private static Logger LOG = LoggerFactory.getLogger(TPCHQuery4.class); - - private int parallelism = 1; - private String ordersInputPath; - private String lineItemInputPath; - private String outputPath; - - - /** - * Small {@link MapFunction} to filer out the irrelevant orders. - * - */ - //@SameKey - public static class OFilter extends MapFunction { - - private final String dateParamString = "1995-01-01"; - private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - private final GregorianCalendar gregCal = new GregorianCalendar(); - - private Date paramDate; - private Date plusThreeMonths; - - @Override - public void open(Configuration parameters) { - try { - this.paramDate = sdf.parse(this.dateParamString); - this.plusThreeMonths = getPlusThreeMonths(paramDate); - - } catch (ParseException e) { - throw new RuntimeException(e); - } - } - - @Override - public void map(Record record, Collector<Record> out) throws Exception { - Tuple tuple = record.getField(1, Tuple.class); - Date orderDate; - - String orderStringDate = tuple.getStringValueAt(4); - - try { - orderDate = sdf.parse(orderStringDate); - } catch (ParseException e) { - throw new RuntimeException(e); - } - - if(paramDate.before(orderDate) && plusThreeMonths.after(orderDate)) { - out.collect(record); - } - - } - - /** - * Calculates the {@link Date} which is three months after the given one. - * @param paramDate of type {@link Date}. - * @return a {@link Date} three month later. - */ - private Date getPlusThreeMonths(Date paramDate) { - - gregCal.setTime(paramDate); - gregCal.add(Calendar.MONTH, 3); - Date plusThreeMonths = gregCal.getTime(); - return plusThreeMonths; - } - } - - /** - * Simple filter for the line item selection. It filters all teh tuples that do - * not satisfy the "l_commitdate < l_receiptdate" condition. - * - */ - //@SameKey - public static class LiFilter extends MapFunction { - - private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - - @Override - public void map(Record record, Collector<Record> out) throws Exception { - Tuple tuple = record.getField(1, Tuple.class); - String commitString = tuple.getStringValueAt(11); - String receiptString = tuple.getStringValueAt(12); - - Date commitDate; - Date receiptDate; - - try { - commitDate = sdf.parse(commitString); - receiptDate = sdf.parse(receiptString); - } catch (ParseException e) { - throw new RuntimeException(e); - } - - if (commitDate.before(receiptDate)) { - out.collect(record); - } - - } - } - - /** - * Implements the equijoin on the orderkey and performs the projection on - * the order priority as well. - * - */ - public static class JoinLiO extends JoinFunction { - - @Override - public void join(Record order, Record line, Collector<Record> out) - throws Exception { - Tuple orderTuple = order.getField(1, Tuple.class); - - orderTuple.project(32); - String newOrderKey = orderTuple.getStringValueAt(0); - - order.setField(0, new StringValue(newOrderKey)); - out.collect(order); - } - } - - /** - * Implements the count(*) part. - * - */ - //@SameKey - public static class CountAgg extends ReduceFunction { - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { - long count = 0; - Record rec = null; - - while(records.hasNext()) { - rec = records.next(); - count++; - } - - if(rec != null) - { - Tuple tuple = new Tuple(); - tuple.addAttribute("" + count); - rec.setField(1, tuple); - } - - out.collect(rec); - } - } - - - @Override - public Plan getPlan(String... args) throws IllegalArgumentException { - - if(args == null || args.length != 4) - { - LOG.warn("number of arguments do not match!"); - this.ordersInputPath = ""; - this.lineItemInputPath = ""; - this.outputPath = ""; - }else - { - setArgs(args); - } - - FileDataSource orders = - new FileDataSource(new IntTupleDataInFormat(), this.ordersInputPath, "Orders"); - orders.setParallelism(this.parallelism); - //orders.setOutputContract(UniqueKey.class); - - FileDataSource lineItems = - new FileDataSource(new IntTupleDataInFormat(), this.lineItemInputPath, "LineItems"); - lineItems.setParallelism(this.parallelism); - - FileDataSink result = - new FileDataSink(new StringTupleDataOutFormat(), this.outputPath, "Output"); - result.setParallelism(parallelism); - - MapOperator lineFilter = - MapOperator.builder(LiFilter.class) - .name("LineItemFilter") - .build(); - lineFilter.setParallelism(parallelism); - - MapOperator ordersFilter = - MapOperator.builder(OFilter.class) - .name("OrdersFilter") - .build(); - ordersFilter.setParallelism(parallelism); - - JoinOperator join = - JoinOperator.builder(JoinLiO.class, IntValue.class, 0, 0) - .name("OrdersLineitemsJoin") - .build(); - join.setParallelism(parallelism); - - ReduceOperator aggregation = - ReduceOperator.builder(CountAgg.class, StringValue.class, 0) - .name("AggregateGroupBy") - .build(); - aggregation.setParallelism(this.parallelism); - - lineFilter.setInput(lineItems); - ordersFilter.setInput(orders); - join.setFirstInput(ordersFilter); - join.setSecondInput(lineFilter); - aggregation.setInput(join); - result.setInput(aggregation); - - - return new Plan(result, "TPC-H 4"); - } - - /** - * Get the args into the members. - * @param args - */ - private void setArgs(String[] args) { - this.parallelism = Integer.parseInt(args[0]); - this.ordersInputPath = args[1]; - this.lineItemInputPath = args[2]; - this.outputPath = args[3]; - } - - - @Override - public String getDescription() { - return "Parameters: [parallelism] [orders-input] [lineitem-input] [output]"; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java deleted file mode 100644 index c00d231..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery9.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.relational; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.test.recordJobs.relational.query9Util.AmountAggregate; -import org.apache.flink.test.recordJobs.relational.query9Util.FilteredPartsJoin; -import org.apache.flink.test.recordJobs.relational.query9Util.IntPair; -import org.apache.flink.test.recordJobs.relational.query9Util.LineItemMap; -import org.apache.flink.test.recordJobs.relational.query9Util.OrderMap; -import org.apache.flink.test.recordJobs.relational.query9Util.OrderedPartsJoin; -import org.apache.flink.test.recordJobs.relational.query9Util.PartFilter; -import org.apache.flink.test.recordJobs.relational.query9Util.PartJoin; -import org.apache.flink.test.recordJobs.relational.query9Util.PartListJoin; -import org.apache.flink.test.recordJobs.relational.query9Util.PartsuppMap; -import org.apache.flink.test.recordJobs.relational.query9Util.StringIntPair; -import org.apache.flink.test.recordJobs.relational.query9Util.StringIntPairStringDataOutFormat; -import org.apache.flink.test.recordJobs.relational.query9Util.SupplierMap; -import org.apache.flink.test.recordJobs.relational.query9Util.SuppliersJoin; -import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat; -import org.apache.flink.types.IntValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Quote from the TPC-H homepage: - * "The TPC-H is a decision support benchmark on relational data. - * Its documentation and the data generator (DBGEN) can be found - * on http://www.tpc.org/tpch/ .This implementation is tested with - * the DB2 data format." - * This PACT program implements the query 9 of the TPC-H benchmark: - * - * <pre> - * select nation, o_year, sum(amount) as sum_profit - * from ( - * select n_name as nation, extract(year from o_orderdate) as o_year, - * l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount - * from part, supplier, lineitem, partsupp, orders, nation - * where - * s_suppkey = l_suppkey and ps_suppkey = l_suppkey and ps_partkey = l_partkey - * and p_partkey = l_partkey and o_orderkey = l_orderkey and s_nationkey = n_nationkey - * and p_name like '%[COLOR]%' - * ) as profit - * group by nation, o_year - * order by nation, o_year desc; - * </pre> - * - * Plan:<br> - * Match "part" and "partsupp" on "partkey" -> "parts" with (partkey, suppkey) as key - * Match "orders" and "lineitem" on "orderkey" -> "ordered_parts" with (partkey, suppkey) as key - * Match "parts" and "ordered_parts" on (partkey, suppkey) -> "filtered_parts" with "suppkey" as key - * Match "supplier" and "nation" on "nationkey" -> "suppliers" with "suppkey" as key - * Match "filtered_parts" and "suppliers" on" suppkey" -> "partlist" with (nation, o_year) as key - * Group "partlist" by (nation, o_year), calculate sum(amount) - * - * <b>Attention:</b> The "order by" part is not implemented! - * - */ -@SuppressWarnings({"serial", "deprecation"}) -public class TPCHQuery9 implements Program, ProgramDescription { - public final String ARGUMENTS = "parallelism partInputPath partSuppInputPath ordersInputPath lineItemInputPath supplierInputPath nationInputPath outputPath"; - - private static Logger LOG = LoggerFactory.getLogger(TPCHQuery9.class); - - private int parallelism = 1; - - private String partInputPath, partSuppInputPath, ordersInputPath, lineItemInputPath, supplierInputPath, - nationInputPath; - - private String outputPath; - - - @Override - public Plan getPlan(String... args) throws IllegalArgumentException { - - if (args.length != 8) - { - LOG.warn("number of arguments do not match!"); - - this.parallelism = 1; - this.partInputPath = ""; - this.partSuppInputPath = ""; - this.ordersInputPath = ""; - this.lineItemInputPath = ""; - this.supplierInputPath = ""; - this.nationInputPath = ""; - this.outputPath = ""; - }else - { - this.parallelism = Integer.parseInt(args[0]); - this.partInputPath = args[1]; - this.partSuppInputPath = args[2]; - this.ordersInputPath = args[3]; - this.lineItemInputPath = args[4]; - this.supplierInputPath = args[5]; - this.nationInputPath = args[6]; - this.outputPath = args[7]; - } - - /* Create the 6 data sources: */ - /* part: (partkey | name, mfgr, brand, type, size, container, retailprice, comment) */ - FileDataSource partInput = new FileDataSource( - new IntTupleDataInFormat(), this.partInputPath, "\"part\" source"); - //partInput.setOutputContract(UniqueKey.class); -// partInput.getCompilerHints().setAvgNumValuesPerKey(1); - - /* partsupp: (partkey | suppkey, availqty, supplycost, comment) */ - FileDataSource partSuppInput = new FileDataSource( - new IntTupleDataInFormat(), this.partSuppInputPath, "\"partsupp\" source"); - - /* orders: (orderkey | custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment) */ - FileDataSource ordersInput = new FileDataSource( - new IntTupleDataInFormat(), this.ordersInputPath, "\"orders\" source"); - //ordersInput.setOutputContract(UniqueKey.class); -// ordersInput.getCompilerHints().setAvgNumValuesPerKey(1); - - /* lineitem: (orderkey | partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, ...) */ - FileDataSource lineItemInput = new FileDataSource( - new IntTupleDataInFormat(), this.lineItemInputPath, "\"lineitem\" source"); - - /* supplier: (suppkey | name, address, nationkey, phone, acctbal, comment) */ - FileDataSource supplierInput = new FileDataSource( - new IntTupleDataInFormat(), this.supplierInputPath, "\"supplier\" source"); - //supplierInput.setOutputContract(UniqueKey.class); -// supplierInput.getCompilerHints().setAvgNumValuesPerKey(1); - - /* nation: (nationkey | name, regionkey, comment) */ - FileDataSource nationInput = new FileDataSource( - new IntTupleDataInFormat(), this.nationInputPath, "\"nation\" source"); - //nationInput.setOutputContract(UniqueKey.class); -// nationInput.getCompilerHints().setAvgNumValuesPerKey(1); - - /* Filter on part's name, project values to NULL: */ - MapOperator filterPart = MapOperator.builder(PartFilter.class) - .name("filterParts") - .build(); - - /* Map to change the key element of partsupp, project value to (supplycost, suppkey): */ - MapOperator mapPartsupp = MapOperator.builder(PartsuppMap.class) - .name("mapPartsupp") - .build(); - - /* Map to extract the year from order: */ - MapOperator mapOrder = MapOperator.builder(OrderMap.class) - .name("mapOrder") - .build(); - - /* Project value to (partkey, suppkey, quantity, price = extendedprice*(1-discount)): */ - MapOperator mapLineItem = MapOperator.builder(LineItemMap.class) - .name("proj.Partsupp") - .build(); - - /* - change the key of supplier to nationkey, project value to suppkey */ - MapOperator mapSupplier = MapOperator.builder(SupplierMap.class) - .name("proj.Partsupp") - .build(); - - /* Equijoin on partkey of part and partsupp: */ - JoinOperator partsJoin = JoinOperator.builder(PartJoin.class, IntValue.class, 0, 0) - .name("partsJoin") - .build(); - - /* Equijoin on orderkey of orders and lineitem: */ - JoinOperator orderedPartsJoin = - JoinOperator.builder(OrderedPartsJoin.class, IntValue.class, 0, 0) - .name("orderedPartsJoin") - .build(); - - /* Equijoin on nationkey of supplier and nation: */ - JoinOperator suppliersJoin = - JoinOperator.builder(SuppliersJoin.class, IntValue.class, 0, 0) - .name("suppliersJoin") - .build(); - - /* Equijoin on (partkey,suppkey) of parts and orderedParts: */ - JoinOperator filteredPartsJoin = - JoinOperator.builder(FilteredPartsJoin.class, IntPair.class, 0, 0) - .name("filteredPartsJoin") - .build(); - - /* Equijoin on suppkey of filteredParts and suppliers: */ - JoinOperator partListJoin = - JoinOperator.builder(PartListJoin.class, IntValue.class , 0, 0) - .name("partlistJoin") - .build(); - - /* Aggregate sum(amount) by (nation,year): */ - ReduceOperator sumAmountAggregate = - ReduceOperator.builder(AmountAggregate.class, StringIntPair.class, 0) - .name("groupyBy") - .build(); - - /* Connect input filters: */ - filterPart.setInput(partInput); - mapPartsupp.setInput(partSuppInput); - mapOrder.setInput(ordersInput); - mapLineItem.setInput(lineItemInput); - mapSupplier.setInput(supplierInput); - - /* Connect equijoins: */ - partsJoin.setFirstInput(filterPart); - partsJoin.setSecondInput(mapPartsupp); - orderedPartsJoin.setFirstInput(mapOrder); - orderedPartsJoin.setSecondInput(mapLineItem); - suppliersJoin.setFirstInput(mapSupplier); - suppliersJoin.setSecondInput(nationInput); - filteredPartsJoin.setFirstInput(partsJoin); - filteredPartsJoin.setSecondInput(orderedPartsJoin); - partListJoin.setFirstInput(filteredPartsJoin); - partListJoin.setSecondInput(suppliersJoin); - - /* Connect aggregate: */ - sumAmountAggregate.setInput(partListJoin); - - /* Connect sink: */ - FileDataSink result = new FileDataSink(new StringIntPairStringDataOutFormat(), this.outputPath, "Results sink"); - result.setInput(sumAmountAggregate); - - Plan p = new Plan(result, "TPC-H query 9"); - p.setDefaultParallelism(this.parallelism); - return p; - } - - @Override - public String getDescription() { - return "TPC-H query 9, parameters: " + this.ARGUMENTS; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java deleted file mode 100644 index a681f64..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQueryAsterix.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.recordJobs.relational; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecondExcept; -import org.apache.flink.api.java.record.io.CsvInputFormat; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; - -/** - * The TPC-H is a decision support benchmark on relational data. - * Its documentation and the data generator (DBGEN) can be found - * on http://www.tpc.org/tpch/ .This implementation is tested with - * the DB2 data format. - * - * This program implements a query on the TPC-H schema - * including one join and an aggregation. - * This query is used as example in the Asterix project (http://asterix.ics.uci.edu/). - * - * SELECT c_mktsegment, COUNT(o_orderkey) - * FROM orders, customer - * WHERE c_custkey = o_custkey - * GROUP BY c_mktsegment; - * - */ -@SuppressWarnings("deprecation") -public class TPCHQueryAsterix implements Program, ProgramDescription { - - private static final long serialVersionUID = 1L; - - - /** - * Realizes the join between Customers and Order table. - */ - @ConstantFieldsSecondExcept(0) - public static class JoinCO extends JoinFunction implements Serializable { - private static final long serialVersionUID = 1L; - - private final IntValue one = new IntValue(1); - - /** - * Output Schema: - * 0: PARTIAL_COUNT=1 - * 1: C_MKTSEGMENT - */ - @Override - public void join(Record order, Record cust, Collector<Record> out) - throws Exception { - cust.setField(0, one); - out.collect(cust); - } - } - - /** - * Reduce implements the aggregation of the results. The - * Combinable annotation is set as the partial counts can be calculated - * already in the combiner - * - */ - @Combinable - @ConstantFields(1) - public static class AggCO extends ReduceFunction implements Serializable { - private static final long serialVersionUID = 1L; - - private final IntValue integer = new IntValue(); - private Record record = new Record(); - - /** - * Output Schema: - * 0: COUNT - * 1: C_MKTSEGMENT - * - */ - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) - throws Exception { - - int count = 0; - - while (records.hasNext()) { - record = records.next(); - count+=record.getField(0, integer).getValue(); - } - - integer.setValue(count); - record.setField(0, integer); - out.collect(record); - } - - /** - * Computes partial counts - */ - public void combine(Iterator<Record> records, Collector<Record> out) - throws Exception { - reduce(records, out); - } - - } - - - @Override - public Plan getPlan(final String... args) { - - // parse program parameters - int numSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String ordersPath = (args.length > 1 ? args[1] : ""); - String customerPath = (args.length > 2 ? args[2] : ""); - String output = (args.length > 3 ? args[3] : ""); - - /* - * Output Schema: - * 0: CUSTOMER_ID - */ - // create DataSourceContract for Orders input - FileDataSource orders = new FileDataSource(new CsvInputFormat(), ordersPath, "Orders"); - orders.setParallelism(numSubtasks); - CsvInputFormat.configureRecordFormat(orders) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(IntValue.class, 1); - - /* - * Output Schema: - * 0: CUSTOMER_ID - * 1: MKT_SEGMENT - */ - // create DataSourceContract for Customer input - FileDataSource customers = new FileDataSource(new CsvInputFormat(), customerPath, "Customers"); - customers.setParallelism(numSubtasks); - CsvInputFormat.configureRecordFormat(customers) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(IntValue.class, 0) - .field(StringValue.class, 6); - - // create JoinOperator for joining Orders and LineItems - JoinOperator joinCO = JoinOperator.builder(new JoinCO(), IntValue.class, 0, 0) - .name("JoinCO") - .build(); - joinCO.setParallelism(numSubtasks); - - // create ReduceOperator for aggregating the result - ReduceOperator aggCO = ReduceOperator.builder(new AggCO(), StringValue.class, 1) - .name("AggCo") - .build(); - aggCO.setParallelism(numSubtasks); - - // create DataSinkContract for writing the result - FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, "Output"); - result.setParallelism(numSubtasks); - CsvOutputFormat.configureRecordFormat(result) - .recordDelimiter('\n') - .fieldDelimiter('|') - .field(IntValue.class, 0) - .field(StringValue.class, 1); - - // assemble the plan - result.setInput(aggCO); - aggCO.setInput(joinCO); - joinCO.setFirstInput(orders); - joinCO.setSecondInput(customers); - - return new Plan(result, "TPCH Asterix"); - } - - - @Override - public String getDescription() { - return "Parameters: [numSubStasks], [orders], [customer], [output]"; - } -}