Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” 
Datatype of my last Transformation (GroupReduce), the integrated Flink Mini 
Clusters does not start. I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", 
"generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 2000000);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();                

// Reading CSV
DataSet<?> input = 
InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, 
parameters.getString("path", ""));
                

//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
                                input
                                .mapPartition(new 
MR_GP_BitStringGeneratorMapper())
                                .reduceGroup(new 
MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<?> result = input
                                .mapPartition(new 
MR_GPSRS_Mapper()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters)
                                .reduceGroup(new 
MR_GPSRS_Reducer()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters);
                

try {
        result.writeAsCsv(parameters.getString("output", ""), 
FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult job = env.execute();
        System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
        } catch (Exception e) {
        // TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the 
console outputs only the following: 
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.BitSet is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.BitSet is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.ArrayList is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class java.util.ArrayList is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, T>
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<T> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the 
same behavior: 
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<Tuple1<T>> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(new Tuple1<T>(tuple));
}

Same as here:
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<ArrayList<T>> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster 
starts:
public class MR_GPSRS_Reducer <T extends Tuple> extends 
RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, 
Collector<Tuple1<ArrayList<T>>> out) throws Exception {
[...]
out.collect(new Tuple1<ArrayList<T>>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink 
Minicluster also starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as 
DataSink? (In that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, 
so that the suppression of the warnings isn't necessary any more)

I'm using <flink.version>1.0.1</flink.version>

Thank you in advance
Robert


Reply via email to