Hi Mailing List,

after "upgrading" the flink version in my pom.xml to 1.0.3, i get two error 
messages for these output variants, which don't work:

org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'main(MR_GPSRS.java:69)' could not be determined automatically, due to 
type erasure. You can give type information hints by using the returns(...) 
method on the result of the transformation call, or by letting your function 
implement the 'ResultTypeQueryable' interface.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'OUT' in 'class 
org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s).


After adding adding ".returns(input.getType())" to my transformation, 
everything works great now : - )
Many thanks to these developers, who added this messages in the last versions!

Best,
Robert
________________________________________
Von: Paschek, Robert <robert.pasc...@tu-berlin.de>
Gesendet: Dienstag, 14. Juni 2016 19:52
An: user@flink.apache.org
Betreff: Lazy Evaluation

Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” 
Datatype of my last Transformation (GroupReduce), the integrated Flink Mini 
Clusters does not start. I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", 
"generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 2000000);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Reading CSV
DataSet<?> input = 
InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, 
parameters.getString("path", ""));


//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
                                input
                                .mapPartition(new 
MR_GP_BitStringGeneratorMapper())
                                .reduceGroup(new 
MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<?> result = input
                                .mapPartition(new 
MR_GPSRS_Mapper()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters)
                                .reduceGroup(new 
MR_GPSRS_Reducer()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters);


try {
        result.writeAsCsv(parameters.getString("output", ""), 
FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult job = env.execute();
        System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
        } catch (Exception e) {
        // TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the 
console outputs only the following:
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.BitSet is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.BitSet is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.ArrayList is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.ArrayList is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, T>
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<T> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the 
same behavior:
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<Tuple1<T>> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(new Tuple1<T>(tuple));
}

Same as here:
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<ArrayList<T>> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster 
starts:
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<Tuple1<ArrayList<T>>> out) throws Exception {
[...]
out.collect(new Tuple1<ArrayList<T>>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink 
Minicluster also starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as 
DataSink? (In that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, 
so that the suppression of the warnings isn't necessary any more)

I'm using <flink.version>1.0.1</flink.version>

Thank you in advance
Robert


Reply via email to