[ https://issues.apache.org/jira/browse/FLINK-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-6146: ---------------------------------- Labels: auto-deprioritized-major stale-minor (was: auto-deprioritized-major) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Incorrect function name given in exception thrown by DataSet.getType() > ---------------------------------------------------------------------- > > Key: FLINK-6146 > URL: https://issues.apache.org/jira/browse/FLINK-6146 > Project: Flink > Issue Type: Bug > Components: API / DataSet > Affects Versions: 1.2.0 > Reporter: Luke Hutchison > Priority: Minor > Labels: auto-deprioritized-major, stale-minor > > In the following code, this exception is thrown at the line marked {{// (1)}}: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The return type > of function 'convertToFractionalRank(MainTest.java:21)' could not be > determined automatically, due to type erasure. You can give type information > hints by using the returns(...) method on the result of the transformation > call, or by letting your function implement the 'ResultTypeQueryable' > interface. > at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) > at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607) > at > com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28) > at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69) > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input > mismatch: Unknown Error. Type is null. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164) > at org.apache.flink.api.java.DataSet.map(DataSet.java:215) > at > com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21) > ... 1 more > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: > Unknown Error. Type is null. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161) > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234) > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131) > ... 6 more > {noformat} > The code: > {code} > import org.apache.flink.api.common.operators.Order; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.operators.DataSource; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.api.java.tuple.Tuple4; > import org.apache.flink.util.Collector; > public class MainTest { > public static <K> DataSet<Tuple2<K, Float>> > convertToFractionalRank(DataSet<Tuple2<K, Float>> key_score) { > // Sum within each key > // Result: ("", key, totScore) > DataSet<Tuple3<String, K, Float>> blank_key_totScore = > key_score > .groupBy(0).sum(1) > // Prepend with "" to prep for for join > .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = > */ t.f1)); > // Count unique keys. Result: ("", numKeys) > DataSet<Tuple2<String, Integer>> blank_numKeys = > blank_key_totScore > .distinct(1) > // (1) > .map(t -> new Tuple2<String, Integer>("", 1)) > .groupBy(0).sum(1); > // Sort scores into order, then return the fractional rank in the > range [0, 1] > return blank_key_totScore > .coGroup(blank_numKeys) > .where(0).equalTo(0) > .with((Iterable<Tuple3<String, K, Float>> ai, > Iterable<Tuple2<String, Integer>> bi, > Collector<Tuple4<String, K, Float, Integer>> out) -> { > int numKeys = bi.iterator().next().f1; > for (Tuple3<String, K, Float> a : ai) { > out.collect(new Tuple4<>("", /* key = */ a.f1, /* > totScore = */ a.f2, numKeys)); > } > }) > // Group by "" (i.e. make into one group, so all the scores > can be sorted together) > .groupBy(0) > // Sort in descending order of score (the highest score gets > the lowest rank, and vice versa) > .sortGroup(2, Order.DESCENDING) > // Convert sorted rank from [0, numKeys-1] -> [0, 1] > .reduceGroup( > (Iterable<Tuple4<String, K, Float, Integer>> iter, > Collector<Tuple2<K, Float>> out) -> { > int rank = 0; > for (Tuple4<String, K, Float, Integer> t : iter) { > int numKeys = t.f3; // Same for all tuples > float fracRank = rank / (float) (numKeys - 1); > out.collect(new Tuple2<>(/* key = */ t.f1, > fracRank)); > rank++; > } > }) > .name("convert problem severity scores into building scores"); > } > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds = > env.fromElements( > new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new > Tuple2<>("x", 2), 1.0f), > new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new > Tuple2<>("x", 3), 1.0f), > new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new > Tuple2<>("y", 1), 1.0f), > new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new > Tuple2<>("y", 3), 1.0f)); > DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 = > convertToFractionalRank(ds); > System.out.println(ds2.collect()); > } > } > {code} > However, it is the {{distinct}} operator, used to compute an intermediate > value, not the return value of the function, for which the type cannot be > computed. The error message is quoting the wrong location information. -- This message was sent by Atlassian Jira (v8.20.1#820001)