[ 
https://issues.apache.org/jira/browse/FLINK-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-6146:
----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Incorrect function name given in exception thrown by DataSet.getType()
> ----------------------------------------------------------------------
>
>                 Key: FLINK-6146
>                 URL: https://issues.apache.org/jira/browse/FLINK-6146
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataSet
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> In the following code, this exception is thrown at the line marked {{// (1)}}:
> {noformat}
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: The return type 
> of function 'convertToFractionalRank(MainTest.java:21)' could not be 
> determined automatically, due to type erasure. You can give type information 
> hints by using the returns(...) method on the result of the transformation 
> call, or by letting your function implement the 'ResultTypeQueryable' 
> interface.
>       at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
>       at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
>       at 
> com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
>       at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input 
> mismatch: Unknown Error. Type is null.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
>       at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
>       at 
> com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
>       ... 1 more
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: 
> Unknown Error. Type is null.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
>       ... 6 more
> {noformat}
> The code:
> {code}
> import org.apache.flink.api.common.operators.Order;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.util.Collector;
> public class MainTest {
>     public static <K> DataSet<Tuple2<K, Float>> 
> convertToFractionalRank(DataSet<Tuple2<K, Float>> key_score) {
>         // Sum within each key
>         // Result: ("", key, totScore)
>         DataSet<Tuple3<String, K, Float>> blank_key_totScore = 
>                 key_score 
>                         .groupBy(0).sum(1) 
>                         // Prepend with "" to prep for for join
>                         .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = 
> */ t.f1));
>         // Count unique keys. Result: ("", numKeys)
>         DataSet<Tuple2<String, Integer>> blank_numKeys = 
>                 blank_key_totScore 
>                         .distinct(1)                                          
>                        // (1)
>                         .map(t -> new Tuple2<String, Integer>("", 1)) 
>                         .groupBy(0).sum(1);
>         // Sort scores into order, then return the fractional rank in the 
> range [0, 1]
>         return blank_key_totScore 
>                 .coGroup(blank_numKeys) 
>                 .where(0).equalTo(0) 
>                 .with((Iterable<Tuple3<String, K, Float>> ai, 
> Iterable<Tuple2<String, Integer>> bi,
>                         Collector<Tuple4<String, K, Float, Integer>> out) -> {
>                     int numKeys = bi.iterator().next().f1;
>                     for (Tuple3<String, K, Float> a : ai) {
>                         out.collect(new Tuple4<>("", /* key = */ a.f1, /* 
> totScore = */ a.f2, numKeys));
>                     }
>                 }) 
>                 // Group by "" (i.e. make into one group, so all the scores 
> can be sorted together)
>                 .groupBy(0) 
>                 // Sort in descending order of score (the highest score gets 
> the lowest rank, and vice versa)
>                 .sortGroup(2, Order.DESCENDING) 
>                 // Convert sorted rank from [0, numKeys-1] -> [0, 1]
>                 .reduceGroup(
>                         (Iterable<Tuple4<String, K, Float, Integer>> iter, 
> Collector<Tuple2<K, Float>> out) -> {
>                             int rank = 0;
>                             for (Tuple4<String, K, Float, Integer> t : iter) {
>                                 int numKeys = t.f3; // Same for all tuples
>                                 float fracRank = rank / (float) (numKeys - 1);
>                                 out.collect(new Tuple2<>(/* key = */ t.f1, 
> fracRank));
>                                 rank++;
>                             }
>                         })
>                 .name("convert problem severity scores into building scores");
>     }
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds = 
> env.fromElements(
>                 new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new 
> Tuple2<>("x", 2), 1.0f),
>                 new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new 
> Tuple2<>("x", 3), 1.0f),
>                 new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new 
> Tuple2<>("y", 1), 1.0f),
>                 new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new 
> Tuple2<>("y", 3), 1.0f));
>         DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 = 
> convertToFractionalRank(ds);
>         System.out.println(ds2.collect());
>     }
> }
> {code}
> However, it is the {{distinct}} operator, used to compute an intermediate 
> value, not the return value of the function, for which the type cannot be 
> computed. The error message is quoting the wrong location information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to