Repository: flink Updated Branches: refs/heads/master b63e19b09 -> efc344a4e
[FLINK-3650] [dataSet] Add maxBy/minBy to Scala DataSet API This closes #1856 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cc69434 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cc69434 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cc69434 Branch: refs/heads/master Commit: 7cc69434a21e23c871d091b0ba8308567a893cde Parents: b63e19b Author: Vasudevan <ramkrishna.s.vasude...@intel.com> Authored: Wed Apr 6 11:43:07 2016 +0530 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Jun 17 00:13:34 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/api/scala/DataSet.scala | 65 +++++- .../apache/flink/api/scala/GroupedDataSet.scala | 29 +++ .../flink/api/scala/SelectByMaxFunction.scala | 59 +++++ .../flink/api/scala/SelectByMinFunction.scala | 60 +++++ .../flink/api/operator/MaxByOperatorTest.scala | 169 ++++++++++++++ .../flink/api/operator/MinByOperatorTest.scala | 173 +++++++++++++++ .../api/operator/SelectByFunctionTest.scala | 219 +++++++++++++++++++ 7 files changed, 770 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 5735b32..4e7be04 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.api.scala -import org.apache.flink.annotation.{PublicEvolving, Public} +import org.apache.flink.annotation.{Public, PublicEvolving} import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.accumulators.SerializedListAccumulator import org.apache.flink.api.common.aggregators.Aggregator @@ -35,7 +35,8 @@ import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat} import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.java.{DataSet => JavaDataSet, Utils} +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.api.java.{Utils, DataSet => JavaDataSet} import org.apache.flink.api.scala.operators.{ScalaAggregateOperator, ScalaCsvOutputFormat} import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.{FileSystem, Path} @@ -262,7 +263,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { case udfOp: UdfOperator[_] => udfOp.withParameters(parameters) case source: DataSource[_] => source.withParameters(parameters) case _ => - throw new UnsupportedOperationException("Operator " + javaSet.toString + throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have parameters") } this @@ -699,6 +700,62 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** + * Selects an element with minimum value. + * + * The minimum is computed over the specified fields in lexicographical order. + * + * Example 1: Given a data set with elements [0, 1], [1, 0], the + * results will be: + * {{{ + * minBy(0)[0, 1] + * minBy(1)[1, 0] + * }}} + * Example 2: Given a data set with elements [0, 0], [0, 1], the + * results will be: + * {{{ + * minBy(0, 1)[0, 0] + * }}} + * If multiple values with minimum value at the specified fields exist, a random one will be + * picked. + * Internally, this operation is implemented as a [[ReduceFunction]] + */ + def minBy(fields: Int*) : DataSet[T] = { + if (!getType.isTupleType) { + throw new InvalidProgramException("DataSet#minBy(int...) only works on Tuple types.") + } + + reduce(new SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], fields.toArray)) + } + + /** + * Selects an element with maximum value. + * + * The maximum is computed over the specified fields in lexicographical order. + * + * Example 1: Given a data set with elements [0, 1], [1, 0], the + * results will be: + * {{{ + * maxBy(0)[1, 0] + * maxBy(1)[0, 1] + * }}} + * Example 2: Given a data set with elements [0, 0], [0, 1], the + * results will be: + * {{{ + * maxBy(0, 1)[0, 1] + * }}} + * If multiple values with maximum value at the specified fields exist, a random one will be + * picked + * Internally, this operation is implemented as a [[ReduceFunction]]. + * + */ + def maxBy(fields: Int*) : DataSet[T] = { + if (!getType.isTupleType) { + throw new InvalidProgramException("DataSet#maxBy(int...) only works on Tuple types.") + } + reduce(new SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], fields.toArray)) + } + + /** * Creates a new DataSet containing the first `n` elements of this DataSet. */ def first(n: Int): DataSet[T] = { @@ -1599,7 +1656,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def output(outputFormat: OutputFormat[T]): DataSink[T] = { javaSet.output(outputFormat) } - + /** * Prints the elements in a DataSet to the standard output stream [[System.out]] of the * JVM that calls the print() method. For programs that are executed in a cluster, this http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index 8af0a4e..18dea07 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.functions.{FirstReducer, KeySelector} import Keys.ExpressionKeys import org.apache.flink.api.java.operators._ +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.scala.operators.ScalaAggregateOperator import org.apache.flink.util.Collector @@ -356,6 +357,34 @@ class GroupedDataSet[T: ClassTag]( } /** + * Applies a special case of a reduce transformation `maxBy` on a grouped [[DataSet]] + * The transformation consecutively calls a [[ReduceFunction]] + * until only a single element remains which is the result of the transformation. + * A ReduceFunction combines two elements into one new element of the same type. + */ + def maxBy(fields: Int*) : DataSet[T] = { + if (!set.getType().isTupleType) { + throw new InvalidProgramException("GroupedDataSet#maxBy(int...) only works on Tuple types.") + } + reduce(new SelectByMaxFunction[T](set.getType.asInstanceOf[TupleTypeInfoBase[T]], + fields.toArray)) + } + + /** + * Applies a special case of a reduce transformation `minBy` on a grouped [[DataSet]]. + * The transformation consecutively calls a [[ReduceFunction]] + * until only a single element remains which is the result of the transformation. + * A ReduceFunction combines two elements into one new element of the same type. + */ + def minBy(fields: Int*) : DataSet[T] = { + if (!set.getType().isTupleType) { + throw new InvalidProgramException("GroupedDataSet#minBy(int...) only works on Tuple types.") + } + reduce(new SelectByMinFunction[T](set.getType.asInstanceOf[TupleTypeInfoBase[T]], + fields.toArray)) + } + + /** * Applies a CombineFunction on a grouped [[DataSet]]. A * CombineFunction is similar to a GroupReduceFunction but does not * perform a full data exchange. Instead, the CombineFunction calls http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala new file mode 100644 index 0000000..9cc5451 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.annotation.Internal + +/** + * SelectByMaxFunction to work with Scala tuples + */ +@Internal +class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], fields : Array[Int]) + extends ReduceFunction[T] { + for(f <- fields) { + if (f < 0 || f >= t.getArity()) { + throw new IndexOutOfBoundsException( + "SelectByMaxFunction field position " + f + " is out of range.") + } + + // Check whether type is comparable + if (!t.getTypeAt(f).isKeyType()) { + throw new IllegalArgumentException( + "SelectByMaxFunction supports only key(Comparable) types.") + } + } + + override def reduce(value1: T, value2: T): T = { + for (f <- fields) { + val element1 = value1.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]] + val element2 = value2.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]] + + val comp = element1.compareTo(element2) + // If comp is bigger than 0 comparable 1 is bigger. + // Return the smaller value. + if (comp > 0) { + return value1 + } else if (comp < 0) { + return value2 + } + } + value1 + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala new file mode 100644 index 0000000..71cdb84 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase + +/** + * SelectByMinFunction to work with Scala tuples + */ +@Internal +class SelectByMinFunction[T](t : TupleTypeInfoBase[T], fields : Array[Int]) + extends ReduceFunction[T] { + for(f <- fields) { + if (f < 0 || f >= t.getArity()) { + throw new IndexOutOfBoundsException( + "SelectByMinFunction field position " + f + " is out of range.") + } + + // Check whether type is comparable + if (!t.getTypeAt(f).isKeyType()) { + throw new IllegalArgumentException( + "SelectByMinFunction supports only key(Comparable) types.") + } + } + + override def reduce(value1: T, value2: T): T = { + for (f <- fields) { + val element1 = value1.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]] + val element2 = value2.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]] + + val comp = element1.compareTo(element2) + + // If comp is bigger than 0 comparable 1 is bigger. + // Return the smaller value. + if (comp < 0) { + return value1 + } else if (comp > 0) { + return value2 + } + } + value1 + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala new file mode 100644 index 0000000..523cd5d --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + + private val emptyTupleData = List[(Int, Long, String, Long, Int)]() + private val customTypeData = List[CustomType]() + + @Test + def testMaxByKeyFieldsDataset(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + try { + collection.maxBy(0, 1, 2, 3, 4) + } catch { + case e : Exception => Assert.fail(); + } + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsDataset1() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + + // should not work, key out of tuple bounds + collection.maxBy(5) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsDataset2() { + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + + // should not work, key out of tuple bounds + collection.maxBy(-1) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsDataset3() { + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + + // should not work, key out of tuple bounds + collection.maxBy(1, 2, 3, 4, -1) + } + + /** + * This test validates that no exceptions is thrown when an empty grouping + * calls maxBy(). + */ + @Test + def testMaxByKeyFieldsGrouping() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + // should work + try { + groupDs.maxBy(4, 0, 1, 2, 3) + } catch { + case e : Exception => Assert.fail(); + } + } + + /** + * This test validates that an InvalidProgrammException is thrown when maxBy + * is used on a custom data type. + */ + @Test(expected = classOf[InvalidProgramException]) + def testCustomKeyFieldsDataset() { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val customDS = env.fromCollection(customTypeData) + // should not work: groups on custom type + customDS.maxBy(0) + } + + /** + * This test validates that an InvalidProgrammException is thrown when maxBy + * is used on a custom data type. + */ + @Test(expected = classOf[InvalidProgramException]) + def testCustomKeyFieldsGrouping() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs: GroupedDataSet[CustomType] = env.fromCollection(customTypeData).groupBy(0) + + groupDs.maxBy(0) + } + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsGrouping1() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + groupDs.maxBy(5) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsGrouping2() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + groupDs.maxBy(-1) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsGrouping3() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + groupDs.maxBy(1, 2, 3, 4, -1) + } + + class CustomType(var myInt: Int, var myLong: Long, var myString: String) { + def this() { + this(0, 0, "") + } + + override def toString: String = { + myInt + "," + myLong + "," + myString + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala new file mode 100644 index 0000000..f9d5249 --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MinByOperatorTest { + private val emptyTupleData = List[(Int, Long, String, Long, Int)]() + private val customTypeData = List[CustomType]() + + @Test + def testMinByKeyFieldsDataset(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + try { + collection.minBy(4, 0, 1, 2, 3) + } catch { + case e : Exception => Assert.fail(); + } + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsDataset1() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + + // should not work, key out of tuple bounds + collection.minBy(5) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsDataset2() { + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + + // should not work, key out of tuple bounds + collection.minBy(-1) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsDataset3() { + val env = ExecutionEnvironment.getExecutionEnvironment + val collection = env.fromCollection(emptyTupleData) + + // should not work, key out of tuple bounds + collection.minBy(1, 2, 3, 4, -1) + } + + /** + * This test validates that an InvalidProgrammException is thrown when minBy + * is used on a custom data type. + */ + @Test(expected = classOf[InvalidProgramException]) + def testCustomKeyFieldsDataset() { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val customDS = env.fromCollection(customTypeData) + // should not work: groups on custom type + customDS.minBy(0) + } + + /** + * This test validates that no exceptions is thrown when an empty grouping + * calls minBy(). + */ + @Test + def testMinByKeyFieldsGrouping() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + // should work + try { + groupDs.minBy(4, 0, 1, 2, 3) + } catch { + case e : Exception => Assert.fail() + } + } + + /** + * This test validates that an InvalidProgrammException is thrown when minBy + * is used on a custom data type. + */ + @Test(expected = classOf[InvalidProgramException]) + def testCustomKeyFieldsGrouping() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs: GroupedDataSet[CustomType] = env.fromCollection(customTypeData).groupBy(0) + + groupDs.minBy(0) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsGrouping1() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + + groupDs.minBy(5) + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsGrouping2() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + + groupDs.minBy(-1) + } + + /**s + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = classOf[IndexOutOfBoundsException]) + def testOutOfTupleBoundsGrouping3() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val groupDs = env.fromCollection(emptyTupleData).groupBy(0) + + groupDs.minBy(1, 2, 3, 4, -1) + } + + class CustomType(var myInt: Int, var myLong: Long, var myString: String) { + def this() { + this(0, 0, "") + } + + override def toString: String = { + myInt + "," + myLong + "," + myString + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7cc69434/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala new file mode 100644 index 0000000..291df79 --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.api.scala.{SelectByMaxFunction, SelectByMinFunction} +import org.apache.flink.api.scala._ +import org.junit.{Assert, Test} + +/** + * + */ +class SelectByFunctionTest { + + val tupleTypeInfo = implicitly[TypeInformation[(Int, Long, String, Long, Int)]] + .asInstanceOf[TupleTypeInfoBase[(Int, Long, String, Long, Int)]] + + private val bigger = (10, 100L, "HelloWorld", 200L, 20) + private val smaller = (5, 50L, "Hello", 50L, 15) + + //Special case where only the last value determines if bigger or smaller + private val specialCaseBigger = (10, 100L, "HelloWorld", 200L, 17) + private val specialCaseSmaller = (5, 50L, "Hello", 50L, 17) + + /** + * This test validates whether the order of tuples has + * + * any impact on the outcome and if the bigger tuple is returned. + */ + @Test + def testMaxByComparison(): Unit = { + val a1 = Array(0) + val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1) + try { + Assert.assertSame("SelectByMax must return bigger tuple", + bigger, maxByTuple.reduce(smaller, bigger)) + Assert.assertSame("SelectByMax must return bigger tuple", + bigger, maxByTuple.reduce(bigger, smaller)) + } catch { + case e : Exception => + Assert.fail("No exception should be thrown while comapring both tuples") + } + } + + // ----------------------- MAXIMUM FUNCTION TEST BELOW -------------------------- + + /** + * This test cases checks when two tuples only differ in one value, but this value is not + * in the fields list. In that case it should be seen as equal + * and then the first given tuple (value1) should be returned by reduce(). + */ + @Test + def testMaxByComparisonSpecialCase1() : Unit = { + val a1 = Array(0, 3) + val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1) + + try { + Assert.assertSame("SelectByMax must return the first given tuple", + specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger)) + Assert.assertSame("SelectByMax must return the first given tuple", + bigger, maxByTuple.reduce(bigger, specialCaseBigger)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown " + + "while comapring both tuples") + } + } + + /** + * This test cases checks when two tuples only differ in one value. + */ + @Test + def testMaxByComparisonSpecialCase2() : Unit = { + val a1 = Array(0, 2, 1, 4, 3) + val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1) + try { + Assert.assertSame("SelectByMax must return bigger tuple", + bigger, maxByTuple.reduce(specialCaseBigger, bigger)) + Assert.assertSame("SelectByMax must return bigger tuple", + bigger, maxByTuple.reduce(bigger, specialCaseBigger)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown" + + " while comapring both tuples") + } + } + + /** + * This test validates that equality is independent of the amount of used indices. + */ + @Test + def testMaxByComparisonMultiple(): Unit = { + val a1 = Array(0, 1, 2, 3, 4) + val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1) + try { + Assert.assertSame("SelectByMax must return bigger tuple", + bigger, maxByTuple.reduce(smaller, bigger)) + Assert.assertSame("SelectByMax must return bigger tuple", + bigger, maxByTuple.reduce(bigger, smaller)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown " + + "while comapring both tuples") + } + } + + /** + * Checks whether reduce does behave as expected if both values are the same object. + */ + @Test + def testMaxByComparisonMustReturnATuple() : Unit = { + val a1 = Array(0) + val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1) + + try { + Assert.assertSame("SelectByMax must return bigger tuple", + bigger, maxByTuple.reduce(bigger, bigger)) + Assert.assertSame("SelectByMax must return smaller tuple", + smaller, maxByTuple.reduce(smaller, smaller)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown" + + " while comapring both tuples") + } + } + + // ----------------------- MINIMUM FUNCTION TEST BELOW -------------------------- + + /** + * This test validates whether the order of tuples has any impact + * on the outcome and if the smaller tuple is returned. + */ + @Test + def testMinByComparison() : Unit = { + val a1 = Array(0) + val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1) + try { + Assert.assertSame("SelectByMin must return smaller tuple", + smaller, minByTuple.reduce(smaller, bigger)) + Assert.assertSame("SelectByMin must return smaller tuple", + smaller, minByTuple.reduce(bigger, smaller)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown " + + "while comapring both tuples") + } + } + + /** + * This test cases checks when two tuples only differ in one value, but this value is not + * in the fields list. In that case it should be seen as equal and + * then the first given tuple (value1) should be returned by reduce(). + */ + @Test + def testMinByComparisonSpecialCase1() : Unit = { + val a1 = Array(0, 3) + val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1) + + try { + Assert.assertSame("SelectByMin must return the first given tuple", + specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger)) + Assert.assertSame("SelectByMin must return the first given tuple", + bigger, minByTuple.reduce(bigger, specialCaseBigger)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown " + + "while comapring both tuples") + } + } + + /** + * This test validates that when two tuples only differ in one value + * and that value's index is given at construction time. The smaller tuple must be returned + * then. + */ + @Test + def testMinByComparisonSpecialCase2() : Unit = { + val a1 = Array(0, 2, 1, 4, 3) + val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1) + + try { + Assert.assertSame("SelectByMin must return smaller tuple", + smaller, minByTuple.reduce(specialCaseSmaller, smaller)) + Assert.assertSame("SelectByMin must return smaller tuple", + smaller, minByTuple.reduce(smaller, specialCaseSmaller)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown" + + " while comapring both tuples") + } + } + + /** + * Checks whether reduce does behave as expected if both values are the same object. + */ + @Test + def testMinByComparisonMultiple() : Unit = { + val a1 = Array(0, 1, 2, 3, 4) + val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1) + try { + Assert.assertSame("SelectByMin must return smaller tuple", + smaller, minByTuple.reduce(smaller, bigger)) + Assert.assertSame("SelectByMin must return smaller tuple", + smaller, minByTuple.reduce(bigger, smaller)) + } catch { + case e : Exception => Assert.fail("No exception should be thrown" + + " while comapring both tuples") + } + } +}