[FLINK-701] Change KeySelector to a SAM interface

This closes #85


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/934e4e00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/934e4e00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/934e4e00

Branch: refs/heads/release-0.6
Commit: 934e4e00df012b4aab128294c05153d0c46f9887
Parents: bc89e91
Author: Stephan Ewen <[email protected]>
Authored: Thu Jul 31 20:31:54 2014 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Aug 1 02:33:34 2014 +0200

----------------------------------------------------------------------
 .../testfunctions/IdentityKeyExtractor.java     |  3 +-
 .../flink/api/java/functions/KeySelector.java   | 10 ++--
 .../flink/api/java/typeutils/TypeExtractor.java |  9 +++-
 .../java/type/extractor/TypeExtractorTest.java  | 23 +++++++++
 .../lambdas/KeySelectorTest.java                | 52 ++++++++++++++++++++
 5 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
index 39ef821..7e7ba16 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityKeyExtractor.java
@@ -16,12 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.testfunctions;
 
 import org.apache.flink.api.java.functions.KeySelector;
 
-public class IdentityKeyExtractor<T> extends KeySelector<T, T> {
+public class IdentityKeyExtractor<T> implements KeySelector<T, T> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index ede7c32..1dcc8c6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.functions;
 
+import org.apache.flink.api.common.functions.Function;
 
 /**
  * The {@link KeySelector} allows to use arbitrary objects for operations such 
as
@@ -28,9 +29,7 @@ package org.apache.flink.api.java.functions;
  * @param <IN> Type of objects to extract the key from.
  * @param <KEY> Type of key.
  */
-public abstract class KeySelector<IN, KEY> implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
+public interface KeySelector<IN, KEY> extends Function, java.io.Serializable {
        
        /**
         * User-defined function that extracts the key from an arbitrary object.
@@ -54,6 +53,9 @@ public abstract class KeySelector<IN, KEY> implements 
java.io.Serializable {
         * 
         * @param value The object to get the key from.
         * @return The extracted key.
+        * 
+        * @throws Exception Throwing an exception will cause the execution of 
the respective task to fail,
+        *                   and trigger recovery or cancellation of the 
program. 
         */
-       public abstract KEY getKey(IN value);
+       KEY getKey(IN value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index e8ee0bb..d03cc49 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -38,9 +38,11 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.functions.InvalidTypesException;
 import org.apache.flink.api.java.functions.KeySelector;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.types.Value;
@@ -134,6 +136,10 @@ public class TypeExtractor {
        
        @SuppressWarnings("unchecked")
        public static <IN, OUT> TypeInformation<OUT> 
getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> 
inType) {
+               if (FunctionUtils.isLambdaFunction(selectorInterface)) {
+                       throw new UnsupportedLambdaExpressionException();
+               }
+               
                validateInputType(KeySelector.class, 
selectorInterface.getClass(), 0, inType);
                if(selectorInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
selectorInterface).getProducedType();
@@ -406,7 +412,8 @@ public class TypeExtractor {
                        return parameter;
                }
                
-               throw new IllegalArgumentException(baseClass.getName() + " must 
be implemented.");
+               throw new IllegalArgumentException("The types of the interface 
" + baseClass.getName() + " could not be inferred. " + 
+                                               "Support for synthetic 
interfaces, lambdas, and generic types is limited at this point.");
        }
        
        private static Type getParameterTypeFromGenericType(Class<?> baseClass, 
ArrayList<Type> typeHierarchy, Type t, int pos) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 8346d00..d5044a8 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -1430,4 +1430,27 @@ public class TypeExtractorTest {
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
                Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
        }
+       
+       @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
+       @Test
+       public void testExtractKeySelector() {
+               KeySelector<String, Integer> selector = new KeySelector<String, 
Integer>() {
+                       @Override
+                       public Integer getKey(String value) { return null; }
+               };
+
+               TypeInformation<?> ti = 
TypeExtractor.getKeySelectorTypes(selector, BasicTypeInfo.STRING_TYPE_INFO);
+               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+               
+               try {
+                       TypeExtractor.getKeySelectorTypes((KeySelector) 
selector, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+                       Assert.fail();
+               }
+               catch (InvalidTypesException e) {
+                       // good
+               }
+               catch (Exception e) {
+                       Assert.fail("wrong exception type");
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/934e4e00/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
new file mode 100644
index 0000000..fd61e25
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+
+public class KeySelectorTest {
+
+       public void testSelectorLambda() {
+               try {
+                       KeySelector<Tuple2<String, Integer>, String> selector = 
(t) -> t.f0;
+                       
+                       try {
+                               TypeExtractor.getKeySelectorTypes(selector, 
+                                               new 
TupleTypeInfo<Tuple2<String, Integer>>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO));
+                               Assert.fail("No unsupported lambdas exception");
+                       }
+                       catch (UnsupportedLambdaExpressionException e) {
+                               // good
+                       }
+                       catch (Exception e) {
+                               Assert.fail("Wrong exception type");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+}

Reply via email to