Writing Intermediates to disk
Hi Mailing List, I want to write and read intermediates to/from disk. The following foo- codesnippet may illustrate my intention: public void mapPartition(Iterable tuples, Collector out) { for (T tuple : tuples) { if (Condition) out.collect(tuple); else writeTupleToDisk } While ('TupleOnDisk') out.collect('ReadNextTupleFromDisk'); } I'am wondering if flink provides an integrated class for this purpose. I also have to precise identify the files with the intermediates due parallelism of mapPartition. Thank you in advance! Robert
Lazy Evaluation
Hi Mailing List, I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following: // Configuration Configuration parameters = new Configuration(); parameters.setString("path", "generated_200_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv"); parameters.setString("output", "result_MR_GPSRS.csv"); parameters.setInteger("dimensionality", 10); parameters.setInteger("cardinality", 200); parameters.setDouble("min", 0.0); parameters.setDouble("max", 200.0); // Setting Up Execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Reading CSV DataSet input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", "")); //Broadcast BitString, Cardinality, Dimensionality, PPD @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet> metaData = input .mapPartition(new MR_GP_BitStringGeneratorMapper()) .reduceGroup(new MR_GP_BitStringGeneratorReducer()); // Calculate result @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet result = input .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters) .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters); try { result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE); JobExecutionResult job = env.execute(); System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000)); } catch (Exception e) { // TODO Auto-generated catch block } When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following: 19:00:12,978 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,010 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,017 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type 19:00:13,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type The MR_GPSRS_Reducer looks like the following: public class MR_GPSRS_Reducer extends RichGroupReduceFunction>, T> [...] @Override public void reduce(Iterable>> localSkylinesPerPartition, Collector out) throws Exception { [...] for (T tuple : tuples) { out.collect(tuple); } If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior: public class MR_GPSRS_Reducer extends RichGroupReduceFunction>, Tuple1>{ [...] @Override public void reduce(Iterable>> localSkylinesPerPartition, Collector> out) throws Exception { [...] for (T tuple : tuples) { out.collect(new Tuple1(tuple)); } Same as here: public class MR_GPSRS_Reducer extends RichGroupReduceFunction>, ArrayList>{ [...] public void reduce(Iterable>> localSkylinesPerPartition, Collector> out) throws Exception { [...] out.collect(tuples); Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts: public class MR_GPSRS_Reducer extends RichGroupReduceFunction>, Tuple1>>{ [...] public void reduce(Iterable>> localSkylinesPerPartition, Collector>> out) throws Exception { [...] out.collect(new Tuple1>(tuples)); (The hint to the not valid PoJo types still remains) But that isn't my preferred format for the DataSink... If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also starts. Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In that case, T would be a Tuple10 with Doubles). (I have already tried to explicitly typecast the datasets and transformations, so that the suppression of the warnings isn't necessary any more) I'm using 1.0.1 Thank you in advance Robert
AW: Lazy Evaluation
Hi Mailing List, after "upgrading" the flink version in my pom.xml to 1.0.3, i get two error messages for these output variants, which don't work: org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(MR_GPSRS.java:69)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'class org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). After adding adding ".returns(input.getType())" to my transformation, everything works great now : - ) Many thanks to these developers, who added this messages in the last versions! Best, Robert Von: Paschek, Robert Gesendet: Dienstag, 14. Juni 2016 19:52 An: user@flink.apache.org Betreff: Lazy Evaluation Hi Mailing List, I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following: // Configuration Configuration parameters = new Configuration(); parameters.setString("path", "generated_200_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv"); parameters.setString("output", "result_MR_GPSRS.csv"); parameters.setInteger("dimensionality", 10); parameters.setInteger("cardinality", 200); parameters.setDouble("min", 0.0); parameters.setDouble("max", 200.0); // Setting Up Execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Reading CSV DataSet input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", "")); //Broadcast BitString, Cardinality, Dimensionality, PPD @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet> metaData = input .mapPartition(new MR_GP_BitStringGeneratorMapper()) .reduceGroup(new MR_GP_BitStringGeneratorReducer()); // Calculate result @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet result = input .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters) .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters); try { result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE); JobExecutionResult job = env.execute(); System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000)); } catch (Exception e) { // TODO Auto-generated catch block } When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following: 19:00:12,978 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,010 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,017 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type 19:00:13,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type The MR_GPSRS_Reducer looks like the following: public class MR_GPSRS_Reducer extends RichGroupReduceFunction>, T> [...] @Override public void reduce(Iterable>> localSkylinesPerPartition, Collector out) throws Exception { [...] for (T tuple : tuples) { out.collect(tuple); } If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior: public class MR_GPSRS_Reducer extends RichGroupReduceFunction>, Tuple1>{ [...] @Override public void reduce(Iterable>> localSkylinesPerPartition, Collector> out) throws Exception { [...] for (T tuple : tuples) { out.collect(new Tuple1(tuple)); } Same as here: public class MR_GPSRS_Reducer extends RichGroupReduceFunction>, ArrayList>{ [...] public void reduce(Iterable>> localSkylinesPerPartition, Collector> out) throws Exception { [...] out.collect(tuples); Only if i
AW: Writing Intermediates to disk
Hey, thank you for your answers and sorry for my late response : - ( The intention was to store some of the data to disk, when the main memory gets full / my temporary ArrayList reaches a pre-defined size. I used com.opencsv.CSVReader and import com.opencsv.CSVWriter for this task and getRuntimeContext().getIndexOfThisSubtask() to differ the filenames from other tasks, running on the same machine. Fortunately that isn't no longer necessary form my work. Best Robert Von: Vikram Saxena Gesendet: Montag, 9. Mai 2016 12:15 An: user@flink.apache.org Betreff: Re: Writing Intermediates to disk I do not know if I understand completely, but I would create a new DataSet based on filtering the condition and then persist this DataSet. So : DataSet ds2 = DataSet1.filter(Condition) 2ds.output(...) On Mon, May 9, 2016 at 11:09 AM, Ufuk Celebi mailto:u...@apache.org>> wrote: Flink has support for spillable intermediate results. Currently they are only set if necessary to avoid pipeline deadlocks. You can force this via env.getConfig().setExecutionMode(ExecutionMode.BATCH); This will write shuffles to disk, but you don't get the fine-grained control you probably need for your use case. - Ufuk On Thu, May 5, 2016 at 3:29 PM, Paschek, Robert mailto:robert.pasc...@tu-berlin.de>> wrote: > Hi Mailing List, > > > > I want to write and read intermediates to/from disk. > > The following foo- codesnippet may illustrate my intention: > > > > public void mapPartition(Iterable tuples, Collector out) { > > > > for (T tuple : tuples) { > > > >if (Condition) > >out.collect(tuple); > >else > >writeTupleToDisk > > } > > > > While ('TupleOnDisk') > >out.collect('ReadNextTupleFromDisk'); > > } > > > > I'am wondering if flink provides an integrated class for this purpose. I > also have to precise identify the files with the intermediates due > parallelism of mapPartition. > > > > > > Thank you in advance! > > Robert
Getting the NumberOfParallelSubtask
Hi Mailing list, using a RichMapPartitionFunction i can access the total number m of this mapper utilized in my job with int m = getRuntimeContext().getNumberOfParallelSubtasks(); I think that would be - in general - the total number of CPU Cores used by Apache Flink among the cluster. Is there a way to access the number of the following reducer? In general i would assume that the number of the following reducers depends on the number of groups generated by the groupBy() transformation. So the number of the reducer r would be 1 <= r <= m. My Job: DataSet output = input .mapPartition(new MR_GPMRS_Mapper()) .groupBy(0) .reduceGroup(new MR_GPMRS_Reducer()); Thank you in advance Robert
AW: Getting the NumberOfParallelSubtask
Hi Chesnay, hi Robert Thank you for your explanations : - ) (And sorry for the late reply). Regards, Robert Von: Robert Metzger [mailto:rmetz...@apache.org] Gesendet: Dienstag, 21. Juni 2016 12:12 An: user@flink.apache.org Betreff: Re: Getting the NumberOfParallelSubtask Hi Robert, the number of parallel subtasks is the parallelism of the job or the individual operator. Only when executing Flink locally, the parallelism is set to the CPU cores. The number of groups generated by the groupBy() transformation doesn't affect the parallelism. Very often the number of groups is much higher than the parallelism, in those cases, each parallel instance will process multiple groups. If you want to know the parallelism of your operators globally, you'll need to set it manually (say all operators to a parallelism of 8). Regards, Robert On Mon, Jun 20, 2016 at 10:00 PM, Chesnay Schepler mailto:ches...@apache.org>> wrote: Within the mapper you cannot access the parallelism of the following nor preceding operation. On 20.06.2016 15:56, Paschek, Robert wrote: Hi Mailing list, using a RichMapPartitionFunction i can access the total number m of this mapper utilized in my job with int m = getRuntimeContext().getNumberOfParallelSubtasks(); I think that would be - in general - the total number of CPU Cores used by Apache Flink among the cluster. Is there a way to access the number of the following reducer? In general i would assume that the number of the following reducers depends on the number of groups generated by the groupBy() transformation. So the number of the reducer r would be 1 <= r <= m. My Job: DataSet output = input .mapPartition(new MR_GPMRS_Mapper()) .groupBy(0) .reduceGroup(new MR_GPMRS_Reducer()); Thank you in advance Robert
AW: Performance issues with GroupBy?
t; Alternatively, if you can reformulate your algorithm to use a `reduce` > instead of a `reduceGroup` that might also improve the performance. > Also, if you are using a `reduce`, then you can try calling > `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine > hint is a relatively new feature, so you need the current master for > this.) > > Best, > Gábor > > > > 2016-07-25 14:06 GMT+02:00 Paschek, Robert > mailto:robert.pasc...@tu-berlin.de>>: >> Hi Mailing List, >> >> >> >> i actually do some benchmarks with different algorithms. The System >> has 8 nodes and a configured parallelism of 48 - The IBM-Power-1 >> cluster, if somebody from the TU Berlin read this : - ) – and to >> “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED” >> >> >> >> It is suspicious, that three of the six algorithms had a big gap in >> runtime (5000ms vs. 2ms) for easy (low dim) tuple. Additionally >> the algorithms in the “upper” group using a groupBy transformation >> and the algorithms in the “lower” group don’t use groupBy. >> >> I attached the plot for better visualization. >> >> >> >> I also checked the logs, especially the time, when the mappers >> finishing and the reducers start _iterating_ - they hardened my speculation. >> >> >> >> So my question is, if it is “normal”, that grouping is so >> cost-intensive that – in my case – the runtime increases by 4 times? >> >> I have data from the same experiments running on a 13 nodes cluster >> with 26 cores with Apache Hadoop MapReduce, where the gap is still >> present, but smaller (50s vs 57s or 55s vs 65s). >> >> >> >> Is there something I might could do to optimize the grouping? Some >> codesnipplets: >> >> >> >> The Job: >> DataSet output = input >> >> .mapPartition(new >> MR_GPMRS_Mapper()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class. >> getSimpleName()+"_MAPPER") >> >> .groupBy(0) >> >> .reduceGroup(new >> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaDat >> a, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class. >> getSimpleName()+"_REDUCER"); >> >> >> >> MR_GPMRS_Mapper(): >> >> public class MR_GPMRS_Mapper extends >> RichMapPartitionFunction> Tuple2>, >> BitSet, BitSet>>> >> >> >> >> MR_GPMRS_Reducer(): >> >> public class MR_GPMRS_Reducer extends >> RichGroupReduceFunction> >> , >> BitSet, BitSet>>, T> >> >> >> >> The Tuple2 has as Payload on position f1 the Tuple3 and on position >> f0 the Integer Key for grouping. >> >> >> >> Any suggestions (or comments, that it is a “normal” behaviour) are welcome : >> - ) >> >> >> >> Thank you in advance! >> >> Robert
Serialization of "not a valid POJO type"
Hi Mailing List, according to my questions (and your answers!) at this topic http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Performance-issues-with-GroupBy-td8130.html I have eliminated my ArrayList in my collect methods. Additional I want to emit partial results. My mapper has the following layout: ArrayList structure = ... For (Tuple tuple : input) { addTupleToStructure() } While(WorkNotDone) { doSomeWorkOnStructure() emitPartialResult(); } Instead of emitting the partial result as an ArrayList ("not a valid POJO type") I do now iterate through this ArrayList and emit each Tuple as Tuple2.of(Integer.valueOf(this.partitionIndex), tuple))); While iterating through this ArrayList and emitting tuples, my mapper seems to be blocked and can't continue to doSomeWorkOnStructure(). So I have three questions: - If I change back to emitting the ArrayList would my Mapper also be blocked until Flink has serialized this ArrayList? Or is Serialization done independent from my Mapper? - If emitting the ArrayList won't block my Mapper, which variant would be more performant? - If I emit ArrayList, but additionally implement a combiner, which o Merges all local ArrayLists with the same partitionIndex o Iterates through the local-merged ArrayLists and emits the containing tuples would that be the best variant? Because the combining is done locally, I would assume that no Serialization is required between Mapper and Combiner. Also, the Mapper is probably not blocked with emitting tuples and can continue doSomeWorkOnStructure() Thank you in advance! Robert
AW: Serialization of "not a valid POJO type"
Hi again, I implemented the scenario with the combiner and answered my 3rd question by myself: The combiners start after the mapper finished, so the reducer will not start processing partial results until the mappers are completely done. Regards Robert Von: Paschek, Robert [mailto:robert.pasc...@tu-berlin.de] Gesendet: Samstag, 30. Juli 2016 12:04 An: user@flink.apache.org Betreff: Serialization of "not a valid POJO type" Hi Mailing List, according to my questions (and your answers!) at this topic http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Performance-issues-with-GroupBy-td8130.html I have eliminated my ArrayList in my collect methods. Additional I want to emit partial results. My mapper has the following layout: ArrayList structure = ... For (Tuple tuple : input) { addTupleToStructure() } While(WorkNotDone) { doSomeWorkOnStructure() emitPartialResult(); } Instead of emitting the partial result as an ArrayList ("not a valid POJO type") I do now iterate through this ArrayList and emit each Tuple as Tuple2.of(Integer.valueOf(this.partitionIndex), tuple))); While iterating through this ArrayList and emitting tuples, my mapper seems to be blocked and can't continue to doSomeWorkOnStructure(). So I have three questions: - If I change back to emitting the ArrayList would my Mapper also be blocked until Flink has serialized this ArrayList? Or is Serialization done independent from my Mapper? - If emitting the ArrayList won't block my Mapper, which variant would be more performant? - If I emit ArrayList, but additionally implement a combiner, which o Merges all local ArrayLists with the same partitionIndex o Iterates through the local-merged ArrayLists and emits the containing tuples would that be the best variant? Because the combining is done locally, I would assume that no Serialization is required between Mapper and Combiner. Also, the Mapper is probably not blocked with emitting tuples and can continue doSomeWorkOnStructure() Thank you in advance! Robert