Repository: flink Updated Branches: refs/heads/master 06503c8fd -> d8dbaeeb4
http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java index d088d16..e9d5dac 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -19,6 +19,8 @@ package org.apache.flink.api.java.typeutils; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,6 +38,7 @@ public class TypeInfoParser { private static final Pattern tuplePattern = Pattern.compile("^((" + TUPLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Tuple[0-9]+)<"); private static final Pattern writablePattern = Pattern.compile("^((" + WRITABLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Writable)<([^\\s,>]*)(,|>|$)"); + private static final Pattern enumPattern = Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$)"); private static final Pattern basicTypePattern = Pattern .compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean))(,|>|$)"); private static final Pattern basicType2Pattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean)(,|>|$)"); @@ -44,7 +47,8 @@ public class TypeInfoParser { private static final Pattern basicArrayTypePattern = Pattern .compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean))\\[\\](,|>|$)"); private static final Pattern basicArrayType2Pattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean)\\[\\](,|>|$)"); - private static final Pattern customObjectPattern = Pattern.compile("^([^\\s,>]+)(,|>|$)"); + private static final Pattern pojoGenericObjectPattern = Pattern.compile("^([^\\s,<>]+)(<)?"); + private static final Pattern fieldPattern = Pattern.compile("^([^\\s,<>]+)="); /** * Generates an instance of <code>TypeInformation</code> by parsing a type @@ -57,18 +61,19 @@ public class TypeInfoParser { * <code>String[]</code>, etc. * <li>Tuple types such as <code>Tuple1<TYPE0></code>, * <code>Tuple2<TYPE0, TYPE1></code>, etc.</li> - * <li>Custom types such as <code>org.my.CustomClass</code>, - * <code>org.my.CustomClass$StaticInnerClass</code>, etc. + * <li>Pojo types such as <code>org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1></code>, etc.</li> + * <li>Generic types such as <code>java.lang.Class</code>, etc. * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>, * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc. * <li>Value types such as <code>DoubleValue</code>, * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li> - * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li> + * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li> * <li>Writable types such as <code>Writable<org.my.CustomWritable></code></li> + * <li>Enum types such as <code>Enum<org.my.CustomEnum></code></li> * </ul> * * Example: - * <code>"Tuple2<String,Tuple2<Integer,org.my.MyClass>>"</code> + * <code>"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"</code> * * @param infoString * type information string to be parsed @@ -97,6 +102,7 @@ public class TypeInfoParser { final Matcher tupleMatcher = tuplePattern.matcher(infoString); final Matcher writableMatcher = writablePattern.matcher(infoString); + final Matcher enumMatcher = enumPattern.matcher(infoString); final Matcher basicTypeMatcher = basicTypePattern.matcher(infoString); final Matcher basicType2Matcher = basicType2Pattern.matcher(infoString); @@ -106,7 +112,7 @@ public class TypeInfoParser { final Matcher basicArrayTypeMatcher = basicArrayTypePattern.matcher(infoString); final Matcher basicArrayType2Matcher = basicArrayType2Pattern.matcher(infoString); - final Matcher customObjectMatcher = customObjectPattern.matcher(infoString); + final Matcher pojoGenericMatcher = pojoGenericObjectPattern.matcher(infoString); if (infoString.length() == 0) { return null; @@ -163,15 +169,17 @@ public class TypeInfoParser { else if (writableMatcher.find()) { String className = writableMatcher.group(1); String fullyQualifiedName = writableMatcher.group(3); - sb.delete(0, className.length() + 1 + fullyQualifiedName.length()); - - try { - Class<?> clazz = Class.forName(fullyQualifiedName); - returnType = WritableTypeInfo.getWritableTypeInfo((Class) clazz); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Class '" + fullyQualifiedName - + "' could not be found for use as writable type. Please note that inner classes must be declared static."); - } + sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1); + Class<?> clazz = loadClass(fullyQualifiedName); + returnType = WritableTypeInfo.getWritableTypeInfo((Class) clazz); + } + // enum types + else if (enumMatcher.find()) { + String className = enumMatcher.group(1); + String fullyQualifiedName = enumMatcher.group(3); + sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1); + Class<?> clazz = loadClass(fullyQualifiedName); + returnType = new EnumTypeInfo(clazz); } // basic types of classes else if (basicTypeMatcher.find()) { @@ -225,7 +233,7 @@ public class TypeInfoParser { } returnType = ValueTypeInfo.getValueTypeInfo((Class<Value>) clazz); } - // array of classes + // array of basic classes else if (basicArrayTypeMatcher.find()) { String className = basicArrayTypeMatcher.group(1); sb.delete(0, className.length() + 2); @@ -263,32 +271,43 @@ public class TypeInfoParser { } returnType = PrimitiveArrayTypeInfo.getInfoFor(clazz); } - // custom objects - else if (customObjectMatcher.find()) { - String fullyQualifiedName = customObjectMatcher.group(1); + // pojo objects or generic types + else if (pojoGenericMatcher.find()) { + String fullyQualifiedName = pojoGenericMatcher.group(1); sb.delete(0, fullyQualifiedName.length()); - if (fullyQualifiedName.contains("<")) { - throw new IllegalArgumentException("Parameterized custom classes are not supported by parser."); - } + boolean isPojo = pojoGenericMatcher.group(2) != null; + + if (isPojo) { + sb.deleteCharAt(0); + Class<?> clazz = loadClass(fullyQualifiedName); + + ArrayList<PojoField> fields = new ArrayList<PojoField>(); + while (sb.charAt(0) != '>') { + final Matcher fieldMatcher = fieldPattern.matcher(sb); + if (!fieldMatcher.find()) { + throw new IllegalArgumentException("Field name missing."); + } + String fieldName = fieldMatcher.group(1); + sb.delete(0, fieldName.length() + 1); - // custom object array - if (fullyQualifiedName.endsWith("[]")) { - fullyQualifiedName = fullyQualifiedName.substring(0, fullyQualifiedName.length() - 2); - try { - Class<?> clazz = Class.forName("[L" + fullyQualifiedName + ";"); - returnType = ObjectArrayTypeInfo.getInfoFor(clazz); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Class '" + fullyQualifiedName - + "' could not be found for use as object array. Please note that inner classes must be declared static."); + Field field = null; + try { + field = clazz.getDeclaredField(fieldName); + } catch (Exception e) { + throw new IllegalArgumentException("Field '" + fieldName + "'could not be accessed."); + } + fields.add(new PojoField(field, parse(sb))); } - } else { - try { - Class<?> clazz = Class.forName(fullyQualifiedName); - returnType = new GenericTypeInfo(clazz); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Class '" + fullyQualifiedName - + "' could not be found for use as custom object. Please note that inner classes must be declared static."); + returnType = new PojoTypeInfo(clazz, fields); + } + else { + // custom object array + if (fullyQualifiedName.endsWith("[]")) { + fullyQualifiedName = fullyQualifiedName.substring(0, fullyQualifiedName.length() - 2); + returnType = ObjectArrayTypeInfo.getInfoFor(loadClass("[L" + fullyQualifiedName + ";")); + } else { + returnType = new GenericTypeInfo(loadClass(fullyQualifiedName)); } } } @@ -304,4 +323,13 @@ public class TypeInfoParser { } } + private static Class<?> loadClass(String fullyQualifiedName) { + try { + return Class.forName(fullyQualifiedName); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Class '" + fullyQualifiedName + + "' could not be found. Please note that inner classes must be declared static."); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java new file mode 100644 index 0000000..6eb536d --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators.translation; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.operators.GenericDataSinkBase; +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.DistinctOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; +import org.junit.Test; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@SuppressWarnings("serial") +public class DistinctTranslationTest { + + @Test + public void testCombinable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> input = env.fromElements("1", "2", "1", "3"); + + + DistinctOperator<String> op = input.distinct(new KeySelector<String, String>() { + public String getKey(String value) { return value; } + }); + + op.print(); + + Plan p = env.createProgramPlan(); + + GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput(); + assertTrue(reduceOp.isCombinable()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void translateDistinctPlain() { + try { + final int DOP = 8; + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); + + DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env); + + initialData.distinct().print(); + + Plan p = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); + + // currently distinct is translated to a GroupReduce + GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); + + // check types + assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); + assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); + + // check keys + assertArrayEquals(new int[] {0, 1, 2}, reducer.getKeyColumns(0)); + + // DOP was not configured on the operator + assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); + + assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test caused an error: " + e.getMessage()); + } + } + + @Test + public void translateDistinctPlain2() { + try { + final int DOP = 8; + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); + + DataSet<CustomType> initialData = getSourcePojoDataSet(env); + + initialData.distinct().print(); + + Plan p = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); + + // currently distinct is translated to a GroupReduce + GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); + + // check types + assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); + assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); + + // check keys + assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0)); + + // DOP was not configured on the operator + assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); + + assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test caused an error: " + e.getMessage()); + } + } + + @Test + public void translateDistinctPosition() { + try { + final int DOP = 8; + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); + + DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env); + + initialData.distinct(1, 2).print(); + + Plan p = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); + + // currently distinct is translated to a GroupReduce + GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); + + // check types + assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); + assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); + + // check keys + assertArrayEquals(new int[] {1, 2}, reducer.getKeyColumns(0)); + + // DOP was not configured on the operator + assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); + + assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test caused an error: " + e.getMessage()); + } + } + + @Test + public void translateDistinctKeySelector() { + try { + final int DOP = 8; + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); + + DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env); + + initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() { + public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) { + return value.f1; + } + }).setParallelism(4).print(); + + Plan p = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); + + PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput(); + MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput(); + + // check the DOPs + assertEquals(1, keyExtractor.getDegreeOfParallelism()); + assertEquals(4, reducer.getDegreeOfParallelism()); + + // check types + TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>( + new ValueTypeInfo<StringValue>(StringValue.class), + initialData.getType()); + + assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType()); + assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType()); + + assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType()); + assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); + + // check keys + assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass()); + + assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test caused an error: " + e.getMessage()); + } + } + + @Test + public void translateDistinctExpressionKey() { + try { + final int DOP = 8; + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); + + DataSet<CustomType> initialData = getSourcePojoDataSet(env); + + initialData.distinct("myInt").print(); + + Plan p = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); + + // currently distinct is translated to a GroupReduce + GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); + + // check types + assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); + assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); + + // check keys + assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0)); + + // DOP was not configured on the operator + assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); + + assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test caused an error: " + e.getMessage()); + } + } + + @SuppressWarnings("unchecked") + private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) { + return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77))) + .setParallelism(1); + } + + private static DataSet<CustomType> getSourcePojoDataSet(ExecutionEnvironment env) { + List<CustomType> data = new ArrayList<CustomType>(); + data.add(new CustomType(1)); + return env.fromCollection(data); + } + + public static class CustomType implements Serializable { + + private static final long serialVersionUID = 1L; + public int myInt; + @SuppressWarnings("unused") + public CustomType() {}; + + public CustomType(int i) { + myInt = i; + } + + @Override + public String toString() { + return ""+myInt; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java deleted file mode 100644 index d06731e..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.operators.translation; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.operators.GenericDataSinkBase; -import org.apache.flink.api.common.operators.GenericDataSourceBase; -import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; -import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.DistinctOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.StringValue; -import org.junit.Test; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@SuppressWarnings("serial") -public class DistrinctTranslationTest { - - @Test - public void testCombinable() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<String> input = env.fromElements("1", "2", "1", "3"); - - - DistinctOperator<String> op = input.distinct(new KeySelector<String, String>() { - public String getKey(String value) { return value; } - }); - - op.print(); - - Plan p = env.createProgramPlan(); - - GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput(); - assertTrue(reduceOp.isCombinable()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void translateDistinctPlain() { - try { - final int DOP = 8; - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); - - DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env); - - initialData.distinct().print(); - - Plan p = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); - - // currently distinct is translated to a GroupReduce - GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); - - // check types - assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); - assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); - - // check keys - assertArrayEquals(new int[] {0, 1, 2}, reducer.getKeyColumns(0)); - - // DOP was not configured on the operator - assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); - - assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test caused an error: " + e.getMessage()); - } - } - - @Test - public void translateDistinctPlain2() { - try { - final int DOP = 8; - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); - - DataSet<CustomType> initialData = getSourcePojoDataSet(env); - - initialData.distinct().print(); - - Plan p = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); - - // currently distinct is translated to a GroupReduce - GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); - - // check types - assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); - assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); - - // check keys - assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0)); - - // DOP was not configured on the operator - assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); - - assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test caused an error: " + e.getMessage()); - } - } - - @Test - public void translateDistinctPosition() { - try { - final int DOP = 8; - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); - - DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env); - - initialData.distinct(1, 2).print(); - - Plan p = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); - - // currently distinct is translated to a GroupReduce - GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); - - // check types - assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); - assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); - - // check keys - assertArrayEquals(new int[] {1, 2}, reducer.getKeyColumns(0)); - - // DOP was not configured on the operator - assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); - - assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test caused an error: " + e.getMessage()); - } - } - - @Test - public void translateDistinctKeySelector() { - try { - final int DOP = 8; - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); - - DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env); - - initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() { - public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) { - return value.f1; - } - }).setParallelism(4).print(); - - Plan p = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); - - PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput(); - MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput(); - - // check the DOPs - assertEquals(1, keyExtractor.getDegreeOfParallelism()); - assertEquals(4, reducer.getDegreeOfParallelism()); - - // check types - TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>( - new ValueTypeInfo<StringValue>(StringValue.class), - initialData.getType()); - - assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType()); - assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType()); - - assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType()); - assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); - - // check keys - assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass()); - - assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test caused an error: " + e.getMessage()); - } - } - - @Test - public void translateDistinctExpressionKey() { - try { - final int DOP = 8; - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP); - - DataSet<CustomType> initialData = getSourcePojoDataSet(env); - - initialData.distinct("myInt").print(); - - Plan p = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); - - // currently distinct is translated to a GroupReduce - GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput(); - - // check types - assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); - assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); - - // check keys - assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0)); - - // DOP was not configured on the operator - assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1); - - assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test caused an error: " + e.getMessage()); - } - } - - @SuppressWarnings("unchecked") - private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) { - return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77))) - .setParallelism(1); - } - - private static DataSet<CustomType> getSourcePojoDataSet(ExecutionEnvironment env) { - List<CustomType> data = new ArrayList<CustomType>(); - data.add(new CustomType(1)); - return env.fromCollection(data); - } - - public static class CustomType implements Serializable { - - private static final long serialVersionUID = 1L; - public int myInt; - @SuppressWarnings("unused") - public CustomType() {}; - - public CustomType(int i) { - myInt = i; - } - - @Override - public String toString() { - return ""+myInt; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java index 23a4d5f..9834a25 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java @@ -44,7 +44,7 @@ import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; -@SuppressWarnings("serial") +@SuppressWarnings({ "serial", "deprecation" }) public class CoGroupWrappingFunctionTest { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java index 7b28c3f..9e262fc 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java @@ -43,7 +43,7 @@ import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; -@SuppressWarnings("serial") +@SuppressWarnings({ "serial", "deprecation" }) public class ReduceWrappingFunctionTest { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index 96ba16b..39d6e10 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple3; @@ -53,6 +52,7 @@ import com.google.common.collect.HashMultiset; public class PojoTypeExtractionTest { public static class HasDuplicateField extends WC { + @SuppressWarnings("unused") private int count; // duplicate } @@ -614,6 +614,6 @@ public class PojoTypeExtractionTest { @Test public void testGetterSetterWithVertex() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<VertexTyped> set = env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); + env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index f8fb13c..1364a2f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -45,6 +45,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple9; import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -626,7 +627,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testMissingTupleGenericsException() { + public void testMissingTupleGenerics() { RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple2>() { private static final long serialVersionUID = 1L; @@ -636,11 +637,15 @@ public class TypeExtractorTest { } }; + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -656,11 +661,15 @@ public class TypeExtractorTest { } }; + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -795,11 +804,15 @@ public class TypeExtractorTest { } }; + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -893,14 +906,18 @@ public class TypeExtractorTest { } @Test - public void testFunctionDependingOnInputException() { + public void testFunctionDependingOnUnknownInput() { IdentityMapper3<Boolean, String> function = new IdentityMapper3<Boolean, String>(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO, "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // right + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -1072,12 +1089,8 @@ public class TypeExtractorTest { } }; - try { - TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // good - } + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO, null, true); + Assert.assertTrue(ti instanceof MissingTypeInfo); RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClass>() { private static final long serialVersionUID = 1L; @@ -1088,12 +1101,8 @@ public class TypeExtractorTest { } }; - try { - TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // slick! - } + TypeInformation<?> ti2 = TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO, null, true); + Assert.assertTrue(ti2 instanceof MissingTypeInfo); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -1108,11 +1117,15 @@ public class TypeExtractorTest { } }; + TypeInformation<?> ti =TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue")); - Assert.fail("exception expected"); - } catch (InvalidTypesException e) { - // bam! go type extractor! + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected } } @@ -1366,14 +1379,19 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testTypeErasureException() { + public void testTypeErasure() { + TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(), + (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + try { TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(), (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>")); - Assert.fail("exception expected"); + + Assert.fail("Expected an exception"); } catch (InvalidTypesException e) { - // right + // expected } } http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java index 16d22a6..9153d8d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -149,12 +150,55 @@ public class TypeInfoParserTest { } @Test - public void testCustomType() { + public void testGenericType() { TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class"); Assert.assertTrue(ti instanceof GenericTypeInfo); Assert.assertEquals(Class.class, ((GenericTypeInfo<?>) ti).getTypeClass()); } + public static class MyPojo { + public Integer basic; + public Tuple2<String, Integer> tuple; + public MyWritable hadoopCitizen; + public String[] array; + } + + @Test + public void testPojoType() { + TypeInformation<?> ti = TypeInfoParser.parse( + "org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<" + + "basic=Integer," + + "tuple=Tuple2<String, Integer>," + + "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>," + + "array=String[]" + + ">"); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; + Assert.assertEquals("array", pti.getPojoFieldAt(0).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicArrayTypeInfo); + Assert.assertEquals("basic", pti.getPojoFieldAt(1).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(1).type instanceof BasicTypeInfo); + Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(2).type instanceof WritableTypeInfo); + Assert.assertEquals("tuple", pti.getPojoFieldAt(3).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(3).type instanceof TupleTypeInfo); + } + + @Test + public void testPojoType2() { + TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<String,Tuple2<Integer,org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>>>"); + Assert.assertTrue(ti instanceof TupleTypeInfo); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo); + Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo); + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) tti.getTypeAt(1); + Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo); + Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo); + PojoTypeInfo<?> pti = (PojoTypeInfo<?>) tti2.getTypeAt(1); + Assert.assertEquals("basic", pti.getPojoFieldAt(0).field.getName()); + Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicTypeInfo); + } + public static class MyWritable implements Writable { @Override @@ -198,6 +242,19 @@ public class TypeInfoParserTest { Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<org.apache.flink.types.StringValue>, Java Tuple1<Integer>>>", ti.toString()); } + public static enum MyEnum { + ONE, TWO, THREE + } + + @Test + public void testEnumType() { + TypeInformation<?> ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>"); + Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti.toString()); + + TypeInformation<?> ti2 = TypeInfoParser.parse("java.lang.Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>"); + Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti2.toString()); + } + @Test public void testException() { try { http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java index fa85f8c..49a3fe5 100644 --- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java +++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,6 +21,7 @@ package org.apache.flink.api.java.type.lambdas; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; + import junit.framework.Assert; import org.apache.flink.api.common.functions.CoGroupFunction; @@ -39,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -54,23 +56,23 @@ public class LambdaExtractionTest { @Override public Integer map(String value) { return Integer.parseInt(value); } }; - + MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } }; - + MapFunction<?, ?> fromProperClass = new StaticMapper(); - + MapFunction<?, ?> fromDerived = new ToTuple<Integer>() { @Override public Tuple2<Integer, Long> map(Integer value) { return new Tuple2<Integer, Long>(value, 1L); } }; - + MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str); - + assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface)); assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass)); assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass)); @@ -83,158 +85,168 @@ public class LambdaExtractionTest { fail(e.getMessage()); } } - - public static class StaticMapper implements MapFunction<String, Integer> { + public static class StaticMapper implements MapFunction<String, Integer> { @Override public Integer map(String value) { return Integer.parseInt(value); } } - - public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> { + public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> { @Override public Tuple2<T, Long> map(T value) throws Exception; } - + private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str); - + public static class MyClass { private String s = "mystring"; - + public MapFunction<Integer, String> getMapFunction() { return (i) -> s; } } - + @Test - public void testLambdaWithMemberVariable() { + public void testLambdaWithMemberVariable() { TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), TypeInfoParser.parse("Integer")); Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO); } - + @Test public void testLambdaWithLocalVariable() { String s = "mystring"; final int k = 24; int j = 26; - + MapFunction<Integer, String> f = (i) -> s + k + j; - + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Integer")); Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO); } - + @Test public void testMapLambda() { MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null; - + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testFlatMapLambda() { FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {}; - + TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testMapPartitionLambda() { MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {}; - + TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testGroupReduceLambda() { GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {}; - + TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testFlatJoinLambda() { FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {}; - + TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testJoinLambda() { JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null; - + TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testCoGroupLambda() { CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {}; - + TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testCrossLambda() { CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null; - + TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @Test public void testKeySelectorLambda() { KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null; - + TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>")); - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + if (!(ti instanceof MissingTypeInfo)) { + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); + } } - + @SuppressWarnings("rawtypes") @Test - public void testLambdaTypeErasureException() { + public void testLambdaTypeErasure() { MapFunction<Tuple1, Tuple1> f = (i) -> null; - - try { - TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1<String>")); - Assert.fail(); - } - catch (InvalidTypesException e) { - // ok - } + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1<String>"), null, true); + Assert.assertTrue(ti instanceof MissingTypeInfo); } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java new file mode 100644 index 0000000..350227a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TypeHintITCase extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 3; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public TypeHintITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + @Override + protected void testProgram() throws Exception { + expectedResult = TypeHintProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class TypeHintProgs { + + public static String runProgram(int progId, String resultPath) throws Exception { + switch(progId) { + // Test identity map with missing types and string type hint + case 1: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds + .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>()) + .returns("Tuple3<Integer, Long, String>"); + identityMapDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + } + // Test identity map with missing types and type information type hint + case 2: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds + // all following generics get erased during compilation + .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>()) + .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + identityMapDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + } + // Test flat map with class type hint + case 3: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + @SuppressWarnings({ "rawtypes", "unchecked" }) + DataSet<Integer> identityMapDs = ds. + flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>()) + .returns((Class) Integer.class); + identityMapDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "2\n" + + "3\n" + + "1\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + } + } + + // -------------------------------------------------------------------------------------------- + + public static class Mapper<T, V> implements MapFunction<T, V> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unchecked") + @Override + public V map(T value) throws Exception { + return (V) value; + } + } + + public static class FlatMapper<T, V> implements FlatMapFunction<T, V> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void flatMap(T value, Collector<V> out) throws Exception { + out.collect((V) ((Tuple3)value).f0); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala index 9c255f3..ab90757 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala @@ -57,6 +57,10 @@ class ScalaAPICompletenessTest { "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed", "org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism", + // TypeHints are only needed for Java API, Scala API doesn't need them + "org.apache.flink.api.java.operators.SingleInputUdfOperator.returns", + "org.apache.flink.api.java.operators.TwoInputUdfOperator.returns", + // This is really just a mapper, which in Scala can easily expressed as a map lambda "org.apache.flink.api.java.DataSet.writeAsFormattedText",
