Hi Mailing List, I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following:
// Configuration Configuration parameters = new Configuration(); parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv"); parameters.setString("output", "result_MR_GPSRS.csv"); parameters.setInteger("dimensionality", 10); parameters.setInteger("cardinality", 2000000); parameters.setDouble("min", 0.0); parameters.setDouble("max", 200.0); // Setting Up Execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Reading CSV DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", "")); //Broadcast BitString, Cardinality, Dimensionality, PPD @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData = input .mapPartition(new MR_GP_BitStringGeneratorMapper()) .reduceGroup(new MR_GP_BitStringGeneratorReducer()); // Calculate result @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet<?> result = input .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters) .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters); try { result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE); JobExecutionResult job = env.execute(); System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000)); } catch (Exception e) { // TODO Auto-generated catch block } When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following: 19:00:12,978 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,010 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,017 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type 19:00:13,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type The MR_GPSRS_Reducer looks like the following: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, T> [...] @Override public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<T> out) throws Exception { [...] for (T tuple : tuples) { out.collect(tuple); } If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{ [...] @Override public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<T>> out) throws Exception { [...] for (T tuple : tuples) { out.collect(new Tuple1<T>(tuple)); } Same as here: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{ [...] public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<ArrayList<T>> out) throws Exception { [...] out.collect(tuples); Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{ [...] public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<ArrayList<T>>> out) throws Exception { [...] out.collect(new Tuple1<ArrayList<T>>(tuples)); (The hint to the not valid PoJo types still remains) But that isn't my preferred format for the DataSink... If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also starts. Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In that case, T would be a Tuple10 with Doubles). (I have already tried to explicitly typecast the datasets and transformations, so that the suppression of the warnings isn't necessary any more) I'm using <flink.version>1.0.1</flink.version> Thank you in advance Robert