Repository: flink
Updated Branches:
  refs/heads/master c06213706 -> 08ca9ffa9


[FLINK-1963] Improve distinct() transformation

This closes #905


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08ca9ffa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08ca9ffa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08ca9ffa

Branch: refs/heads/master
Commit: 08ca9ffa9a95610c073145a09e731311e728c4fd
Parents: c062137
Author: pietro pinoli <pie...@pietros-mbp.lan>
Authored: Mon Jul 13 13:32:20 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Jul 17 01:39:25 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 15 ++---
 .../api/java/operators/DistinctOperator.java    | 26 +++-----
 .../api/java/operator/DistinctOperatorTest.java | 65 +++++++++++++++++++-
 .../org/apache/flink/api/scala/DataSet.scala    | 49 ++++++++++-----
 .../test/javaApiOperators/DistinctITCase.java   | 44 +++++++++++++
 .../api/scala/operators/DistinctITCase.scala    | 39 ++++++++++++
 .../scala/operators/DistinctOperatorTest.scala  | 26 ++++++++
 7 files changed, 223 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index c628b04..81ba279 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -606,13 +606,13 @@ public abstract class DataSet<T> {
        }
        
        /**
-        * Returns a distinct set of a {@link Tuple} {@link DataSet} using 
expression keys.
-        * <p>
-        * The field position keys specify the fields of Tuples or Pojos on 
which the decision is made if two elements are distinct or
-        * not.
+        * Returns a distinct set of a {@link DataSet} using expression keys.
         * <p>
+        * The field expression keys specify the fields of a {@link 
org.apache.flink.api.common.typeutils.CompositeType}
+        * (e.g., Tuple or Pojo type) on which the decision is made if two 
elements are distinct or not.
+        * In case of a {@link 
org.apache.flink.api.common.typeinfo.AtomicType}, only the wildcard expression 
("*") is valid.
         *
-        * @param fields One or more field positions on which the distinction 
of the DataSet is decided. 
+        * @param fields One or more field expressions on which the distinction 
of the DataSet is decided.
         * @return A DistinctOperator that represents the distinct DataSet.
         */
        public DistinctOperator<T> distinct(String... fields) {
@@ -620,9 +620,10 @@ public abstract class DataSet<T> {
        }
        
        /**
-        * Returns a distinct set of a {@link Tuple} {@link DataSet} using all 
fields of the tuple.
+        * Returns a distinct set of a {@link DataSet}.
         * <p>
-        * Note: This operator can only be applied to Tuple DataSets.
+        * If the input is a {@link 
org.apache.flink.api.common.typeutils.CompositeType} (Tuple or Pojo type),
+        * distinct is performed on all fields and each field must be a key type
         * 
         * @return A DistinctOperator that represents the distinct DataSet.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 686823c..a6eb43e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -47,28 +48,21 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
        private final Keys<T> keys;
        
        private final String distinctLocationName;
-       
+
        public DistinctOperator(DataSet<T> input, Keys<T> keys, String 
distinctLocationName) {
                super(input, input.getType());
 
                this.distinctLocationName = distinctLocationName;
-               
-               // if keys is null distinction is done on all tuple fields
-               if (keys == null) {
-                       if (input.getType() instanceof CompositeType) {
-                               keys = new Keys.ExpressionKeys<T>(new String[] 
{Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
-                       }
-                       else {
-                               throw new InvalidProgramException("Distinction 
on all fields is only possible on composite (pojo / tuple) data types.");
-                       }
+
+               if (!(input.getType() instanceof CompositeType) &&
+                               !(input.getType() instanceof AtomicType && 
input.getType().isKeyType())){
+                       throw new InvalidProgramException("Distinct only 
possible on composite or atomic key types.");
                }
-               
-               
-               // FieldPositionKeys can only be applied on Tuples and POJOs
-               if (keys instanceof Keys.ExpressionKeys && !(input.getType() 
instanceof CompositeType)) {
-                       throw new InvalidProgramException("Distinction on field 
positions is only possible on composite type DataSets.");
+               // if keys is null distinction is done on all fields
+               if (keys == null) {
+                       keys = new Keys.ExpressionKeys<T>(new String[] 
{Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
                }
-               
+
                this.keys = keys;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index f4c87c8..f4bd945 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -114,18 +114,31 @@ public class DistinctOperatorTest {
        
        @Test(expected = IllegalArgumentException.class)
        public void testDistinctByKeyFields6() {
-               
+
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = 
env.fromCollection(emptyTupleData, tupleTypeInfo);
 
                // should not work, negative field position
                tupleDs.distinct(-1);
        }
+
+       @Test
+       public void testDistinctByKeyFields7(){
+               final ExecutionEnvironment env  = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Long> longDs = env.fromCollection(emptyLongData, 
BasicTypeInfo.LONG_TYPE_INFO);
+
+               // should work
+               try {
+                       longDs.distinct("*");
+               } catch (Exception e){
+                       Assert.fail();
+               }
+       }
        
        @Test
        @SuppressWarnings("serial")
        public void testDistinctByKeySelector1() {
-               
+
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                this.customTypeData.add(new CustomType());
                
@@ -145,7 +158,53 @@ public class DistinctOperatorTest {
                }
                
        }
-       
+
+       @Test
+       public void  testDistinctByKeyIndices1() {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               try {
+                       DataSet<Long> longDs = 
env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
+                       // should work
+                       longDs.distinct();
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testDistinctOnNotKeyDataType() throws Exception {
+       /*
+       * should not work. NotComparable data type cannot be used as key
+       */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               NotComparable a = new NotComparable();
+               List<NotComparable> l = new ArrayList<NotComparable>();
+               l.add(a);
+
+               DataSet<NotComparable> ds = env.fromCollection(l);
+               DataSet<NotComparable> reduceDs = ds.distinct();
+
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testDistinctOnNotKeyDataTypeOnSelectAllChar() throws 
Exception {
+       /*
+       * should not work. NotComparable data type cannot be used as key
+       */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               NotComparable a = new NotComparable();
+               List<NotComparable> l = new ArrayList<NotComparable>();
+               l.add(a);
+
+               DataSet<NotComparable> ds = env.fromCollection(l);
+               DataSet<NotComparable> reduceDs = ds.distinct("*");
+       }
+
+       class NotComparable {
+               public List<Integer> myInts;
+       }
 
        public static class CustomType implements Serializable {
                

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index fd1492a..3a0f6d9 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -710,10 +710,12 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   // 
--------------------------------------------------------------------------------------------
   //  distinct
   // 
--------------------------------------------------------------------------------------------
-
   /**
    * Creates a new DataSet containing the distinct elements of this DataSet. 
The decision whether
    * two elements are distinct or not is made using the return value of the 
given function.
+   *
+   * @param fun The function which extracts the key values from the DataSet on 
which the
+   *            distinction of the DataSet is decided.
    */
   def distinct[K: TypeInformation](fun: T => K): DataSet[T] = {
     val keyExtractor = new KeySelector[T, K] {
@@ -728,10 +730,24 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Creates a new DataSet containing the distinct elements of this DataSet. 
The decision whether
-   * two elements are distinct or not is made based on only the specified 
tuple fields.
+   * Returns a distinct set of a {@link DataSet}.
+   * <p>
+   * If the input is a composite type (Tuple or Pojo type), distinct is 
performed on all fields
+   * and each field must be a key type.
+   */
+  def distinct: DataSet[T] = {
+    wrap(new DistinctOperator[T](javaSet, null, getCallLocationName()))
+  }
+
+  /**
+   * Returns a distinct set of a {@link Tuple} {@link DataSet} using field 
position keys.
+   * <p>
+   * The field position keys specify the fields of Tuples on which the 
decision is made if
+   * two Tuples are distinct or not.
+   * <p>
+   * Note: Field position keys can only be specified for Tuple DataSets.
    *
-   * This only works on tuple DataSets.
+   * @param fields One or more field positions on which the distinction of the 
DataSet is decided.
    */
   def distinct(fields: Int*): DataSet[T] = {
     wrap(new DistinctOperator[T](
@@ -741,8 +757,20 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Creates a new DataSet containing the distinct elements of this DataSet. 
The decision whether
-   * two elements are distinct or not is made based on only the specified 
fields.
+   * Returns a distinct set of a {@link Tuple} {@link DataSet} using 
expression keys.
+   * <p>
+   * The field position keys specify the fields of Tuples or Pojos on which 
the decision is made
+   * if two elements are distinct or not.
+   *
+   * The field expression keys specify the fields of a
+   * {@link org.apache.flink.api.common.typeutils.CompositeType}
+   * (e.g., Tuple or Pojo type) on which the decision is made if two elements 
are distinct or not.
+   * In case of a {@link org.apache.flink.api.common.typeinfo.AtomicType}, 
only the
+   * wildcard expression ("_") is valid.
+   *
+   * @param firstField First field position on which the distinction of the 
DataSet is decided
+   * @param otherFields Zero or more field positions on which the distinction 
of the DataSet
+   *                    is decided
    */
   def distinct(firstField: String, otherFields: String*): DataSet[T] = {
     wrap(new DistinctOperator[T](
@@ -751,15 +779,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       getCallLocationName()))
   }
 
-  /**
-   * Creates a new DataSet containing the distinct elements of this DataSet. 
The decision whether
-   * two elements are distinct or not is made based on all tuple fields.
-   *
-   * This only works if this DataSet contains Tuples.
-   */
-  def distinct: DataSet[T] = {
-    wrap(new DistinctOperator[T](javaSet, null, getCallLocationName()))
-  }
 
   // 
--------------------------------------------------------------------------------------------
   //  Keyed DataSet

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 02dbb76..d32986d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.avro.generic.GenericData;
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -274,4 +277,45 @@ public class DistinctITCase extends 
MultipleProgramsTestBase {
                        return (int) value.nestedPojo.longNumber;
                }
        }
+
+       @Test
+       public void testCorrectnessOfDistinctOnAtomic() throws Exception {
+               /*
+                * check correctness of distinct on Integers
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
+               DataSet<Integer> reduceDs = ds.distinct();
+
+               List<Integer> result = reduceDs.collect();
+
+               String expected = "1\n2\n3\n4\n5";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfDistinctOnAtomicWithSelectAllChar() throws 
Exception {
+               /*
+                * check correctness of distinct on Strings, using 
Keys.ExpressionKeys.SELECT_ALL_CHAR
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+               DataSet<String> reduceDs = ds.union(ds).distinct("*");
+
+               List<String> result = reduceDs.collect();
+
+               String expected = "I am fine.\n" +
+                               "Luke Skywalker\n" +
+                               "LOL\n" +
+                               "Hello world, how are you?\n" +
+                               "Hi\n" +
+                               "Hello world\n" +
+                               "Hello\n" +
+                               "Random comment\n";
+
+               compareResultAsText(result, expected);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index cf82ce9..8b1e2fc 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -174,6 +174,45 @@ class DistinctITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(m
     env.execute()
     expected = "10000\n20000\n30000\n"
   }
+
+  @Test
+  def testCorrectnessOfDistinctOnAtomic(): Unit = {
+    /*
+     * check correctness of distinct on Integers
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getIntDataSet(env)
+
+    val reduceDs = ds.distinct
+
+    reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n2\n3\n4\n5"
+  }
+
+  @Test
+  def testCorrectnessOfDistinctOnAtomicWithSelectAllChar(): Unit = {
+    /*
+     * check correctness of distinct on Strings, using 
Keys.ExpressionKeys.SELECT_ALL_CHAR
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+    val reduceDs = ds.union(ds).distinct("_")
+
+    reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "I am fine.\n" +
+      "Luke Skywalker\n" +
+      "LOL\n" +
+      "Hello world, how are you?\n" +
+      "Hi\n" +
+      "Hello world\n" +
+      "Hello\n" +
+      "Random comment\n"
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
index ca93d86..7fc53e5 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
@@ -90,6 +90,19 @@ class DistinctOperatorTest {
   }
 
   @Test
+  def testDistinctByKeyIndices7(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val longDs = env.fromCollection(emptyLongData)
+
+    // should work
+    try {
+      longDs.distinct
+    } catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
   def testDistinctByKeyFields1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tupleDs = env.fromCollection(emptyTupleData)
@@ -140,6 +153,19 @@ class DistinctOperatorTest {
   }
 
   @Test
+  def testDistinctByKeyFields6(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val longDs = env.fromCollection(emptyLongData)
+
+    // should work
+    try {
+      longDs.distinct("_")
+    } catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
   def testDistinctByKeySelector1(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     try {

Reply via email to