Writing Intermediates to disk

2016-05-05 Thread Paschek, Robert
Hi Mailing List,

I want to write and read intermediates to/from disk.
The following foo- codesnippet may illustrate my intention:

public void mapPartition(Iterable tuples, Collector out) {

for (T tuple : tuples) {

   if (Condition)
   out.collect(tuple);
   else
   writeTupleToDisk
}

While ('TupleOnDisk')
   out.collect('ReadNextTupleFromDisk');
}

I'am wondering if flink provides an integrated class for this purpose. I also 
have to precise identify the files with the intermediates due parallelism of 
mapPartition.


Thank you in advance!
Robert


Lazy Evaluation

2016-06-14 Thread Paschek, Robert
Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” 
Datatype of my last Transformation (GroupReduce), the integrated Flink Mini 
Clusters does not start. I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", 
"generated_200_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 200);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

// Reading CSV
DataSet input = 
InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, 
parameters.getString("path", ""));


//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet> metaData =
input
.mapPartition(new 
MR_GP_BitStringGeneratorMapper())
.reduceGroup(new 
MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet result = input
.mapPartition(new 
MR_GPSRS_Mapper()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters)
.reduceGroup(new 
MR_GPSRS_Reducer()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters);


try {
result.writeAsCsv(parameters.getString("output", ""), 
FileSystem.WriteMode.OVERWRITE);
JobExecutionResult job = env.execute();
System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
} catch (Exception e) {
// TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the 
console outputs only the following: 
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.BitSet is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.BitSet is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.ArrayList is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.ArrayList is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer  extends 
RichGroupReduceFunction>, T>
[...]
@Override
public void reduce(Iterable>> localSkylinesPerPartition, 
Collector out) throws Exception {
[...]
for (T tuple : tuples) {
out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the 
same behavior: 
public class MR_GPSRS_Reducer  extends 
RichGroupReduceFunction>, Tuple1>{
[...]
@Override
public void reduce(Iterable>> localSkylinesPerPartition, 
Collector> out) throws Exception {
[...]
for (T tuple : tuples) {
out.collect(new Tuple1(tuple));
}

Same as here:
public class MR_GPSRS_Reducer  extends 
RichGroupReduceFunction>, ArrayList>{
[...]
public void reduce(Iterable>> localSkylinesPerPartition, 
Collector> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster 
starts:
public class MR_GPSRS_Reducer  extends 
RichGroupReduceFunction>, Tuple1>>{
[...]
public void reduce(Iterable>> localSkylinesPerPartition, 
Collector>> out) throws Exception {
[...]
out.collect(new Tuple1>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink 
Minicluster also starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as 
DataSink? (In that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, 
so that the suppression of the warnings isn't necessary any more)

I'm using 1.0.1

Thank you in advance
Robert




AW: Lazy Evaluation

2016-06-19 Thread Paschek, Robert
Hi Mailing List,

after "upgrading" the flink version in my pom.xml to 1.0.3, i get two error 
messages for these output variants, which don't work:

org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'main(MR_GPSRS.java:69)' could not be determined automatically, due to 
type erasure. You can give type information hints by using the returns(...) 
method on the result of the transformation call, or by letting your function 
implement the 'ResultTypeQueryable' interface.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'OUT' in 'class 
org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s).


After adding adding ".returns(input.getType())" to my transformation, 
everything works great now : - )
Many thanks to these developers, who added this messages in the last versions!

Best,
Robert

Von: Paschek, Robert 
Gesendet: Dienstag, 14. Juni 2016 19:52
An: user@flink.apache.org
Betreff: Lazy Evaluation

Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” 
Datatype of my last Transformation (GroupReduce), the integrated Flink Mini 
Clusters does not start. I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", 
"generated_200_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 200);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Reading CSV
DataSet input = 
InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, 
parameters.getString("path", ""));


//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet> metaData =
input
.mapPartition(new 
MR_GP_BitStringGeneratorMapper())
.reduceGroup(new 
MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet result = input
.mapPartition(new 
MR_GPSRS_Mapper()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters)
.reduceGroup(new 
MR_GPSRS_Reducer()).withBroadcastSet(metaData, 
"MetaData").withParameters(parameters);


try {
result.writeAsCsv(parameters.getString("output", ""), 
FileSystem.WriteMode.OVERWRITE);
JobExecutionResult job = env.execute();
System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
} catch (Exception e) {
// TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the 
console outputs only the following:
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.BitSet is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.BitSet is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.ArrayList is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class java.util.ArrayList is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer  extends 
RichGroupReduceFunction>, T>
[...]
@Override
public void reduce(Iterable>> localSkylinesPerPartition, 
Collector out) throws Exception {
[...]
for (T tuple : tuples) {
out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the 
same behavior:
public class MR_GPSRS_Reducer  extends 
RichGroupReduceFunction>, Tuple1>{
[...]
@Override
public void reduce(Iterable>> localSkylinesPerPartition, 
Collector> out) throws Exception {
[...]
for (T tuple : tuples) {
out.collect(new Tuple1(tuple));
}

Same as here:
public class MR_GPSRS_Reducer  extends 
RichGroupReduceFunction>, ArrayList>{
[...]
public void reduce(Iterable>> localSkylinesPerPartition, 
Collector> out) throws Exception {
[...]
out.collect(tuples);

Only if i 

AW: Writing Intermediates to disk

2016-06-19 Thread Paschek, Robert
Hey,



thank you for your answers and sorry for my late response : - (



The intention was to store some of the data to disk, when the main memory gets 
full / my temporary ArrayList reaches a pre-defined size.



I used com.opencsv.CSVReader and import com.opencsv.CSVWriter for this task and 
getRuntimeContext().getIndexOfThisSubtask() to differ the filenames from other 
tasks, running on the same machine.

Fortunately that isn't no longer necessary form my work.



Best

Robert





Von: Vikram Saxena 
Gesendet: Montag, 9. Mai 2016 12:15
An: user@flink.apache.org
Betreff: Re: Writing Intermediates to disk

I do not know if I understand completely, but I would  create a new DataSet 
based on filtering the condition and then persist this DataSet.

So :

DataSet ds2 = DataSet1.filter(Condition)

2ds.output(...)




On Mon, May 9, 2016 at 11:09 AM, Ufuk Celebi 
mailto:u...@apache.org>> wrote:
Flink has support for spillable intermediate results. Currently they
are only set if necessary to avoid pipeline deadlocks.

You can force this via

env.getConfig().setExecutionMode(ExecutionMode.BATCH);

This will write shuffles to disk, but you don't get the fine-grained
control you probably need for your use case.

- Ufuk

On Thu, May 5, 2016 at 3:29 PM, Paschek, Robert
mailto:robert.pasc...@tu-berlin.de>> wrote:
> Hi Mailing List,
>
>
>
> I want to write and read intermediates to/from disk.
>
> The following foo- codesnippet may illustrate my intention:
>
>
>
> public void mapPartition(Iterable tuples, Collector out) {
>
>
>
> for (T tuple : tuples) {
>
>
>
>if (Condition)
>
>out.collect(tuple);
>
>else
>
>writeTupleToDisk
>
> }
>
>
>
> While ('TupleOnDisk')
>
>out.collect('ReadNextTupleFromDisk');
>
> }
>
>
>
> I'am wondering if flink provides an integrated class for this purpose. I
> also have to precise identify the files with the intermediates due
> parallelism of mapPartition.
>
>
>
>
>
> Thank you in advance!
>
> Robert



Getting the NumberOfParallelSubtask

2016-06-20 Thread Paschek, Robert
Hi Mailing list,

using a RichMapPartitionFunction i can access the total number m of this mapper 
utilized in my job with
int m = getRuntimeContext().getNumberOfParallelSubtasks();

I think that would be - in general - the total number of CPU Cores used by 
Apache Flink among the cluster.

Is there a way to access the number of the following reducer?

In general i would assume that the number of the following reducers depends on 
the number of groups generated by the groupBy() transformation. So the number 
of the reducer r would be 1 <= r <= m.

My Job:
DataSet output = input
.mapPartition(new MR_GPMRS_Mapper())
.groupBy(0)
.reduceGroup(new MR_GPMRS_Reducer());

Thank you in advance
Robert

AW: Getting the NumberOfParallelSubtask

2016-07-22 Thread Paschek, Robert
Hi Chesnay, hi Robert

Thank you for your explanations : - )
(And sorry for the late reply).

Regards,
Robert

Von: Robert Metzger [mailto:rmetz...@apache.org]
Gesendet: Dienstag, 21. Juni 2016 12:12
An: user@flink.apache.org
Betreff: Re: Getting the NumberOfParallelSubtask

Hi Robert,

the number of parallel subtasks is the parallelism of the job or the individual 
operator. Only when executing Flink locally, the parallelism is set to the CPU 
cores.
The number of groups generated by the groupBy() transformation doesn't affect 
the parallelism. Very often the number of groups is much higher than the 
parallelism, in those cases, each parallel instance will process multiple 
groups.

If you want to know the parallelism of your operators globally, you'll need to 
set it manually (say all operators to a parallelism of 8).

Regards,
Robert


On Mon, Jun 20, 2016 at 10:00 PM, Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
Within the mapper you cannot access the parallelism of the following nor 
preceding operation.


On 20.06.2016 15:56, Paschek, Robert wrote:
Hi Mailing list,

using a RichMapPartitionFunction i can access the total number m of this mapper 
utilized in my job with
int m = getRuntimeContext().getNumberOfParallelSubtasks();

I think that would be - in general - the total number of CPU Cores used by 
Apache Flink among the cluster.

Is there a way to access the number of the following reducer?

In general i would assume that the number of the following reducers depends on 
the number of groups generated by the groupBy() transformation. So the number 
of the reducer r would be 1 <= r <= m.

My Job:
DataSet output = input
.mapPartition(new MR_GPMRS_Mapper())
.groupBy(0)
.reduceGroup(new MR_GPMRS_Reducer());

Thank you in advance
Robert




AW: Performance issues with GroupBy?

2016-07-27 Thread Paschek, Robert
t; Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)

>

> Best,

> Gábor

>

>

>

> 2016-07-25 14:06 GMT+02:00 Paschek, Robert 
> mailto:robert.pasc...@tu-berlin.de>>:

>> Hi Mailing List,

>>

>>

>>

>> i actually do some benchmarks with different algorithms. The System

>> has 8 nodes and a configured parallelism of 48 - The IBM-Power-1

>> cluster, if somebody from the TU Berlin read this : - ) – and to

>> “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED”

>>

>>

>>

>> It is suspicious, that three of the six algorithms had a big gap in

>> runtime (5000ms vs. 2ms) for easy (low dim) tuple. Additionally

>> the algorithms in the “upper” group using a groupBy transformation

>> and the algorithms in the “lower” group don’t use groupBy.

>>

>> I attached the plot for better visualization.

>>

>>

>>

>> I also checked the logs, especially the time, when the mappers

>> finishing and the reducers start _iterating_ - they hardened my speculation.

>>

>>

>>

>> So my question is, if it is “normal”, that grouping is so

>> cost-intensive that – in my case – the runtime increases by 4 times?

>>

>> I have data from the same experiments running on a 13 nodes cluster

>> with 26 cores with Apache Hadoop MapReduce, where the gap is still

>> present, but smaller (50s vs 57s or 55s vs 65s).

>>

>>

>>

>> Is there something I might could do to optimize the grouping? Some

>> codesnipplets:

>>

>>

>>

>> The Job:

>> DataSet output = input

>>

>> .mapPartition(new

>> MR_GPMRS_Mapper()).withBroadcastSet(metaData,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_MAPPER")

>>

>> .groupBy(0)

>>

>> .reduceGroup(new

>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaDat

>> a,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_REDUCER");

>>

>>

>>

>> MR_GPMRS_Mapper():

>>

>> public class MR_GPMRS_Mapper  extends

>> RichMapPartitionFunction> Tuple2>,

>> BitSet, BitSet>>>

>>

>>

>>

>> MR_GPMRS_Reducer():

>>

>> public class MR_GPMRS_Reducer  extends

>> RichGroupReduceFunction>

>> ,

>> BitSet, BitSet>>, T>

>>

>>

>>

>> The Tuple2 has as Payload on position f1 the Tuple3 and on position

>> f0 the Integer Key for grouping.

>>

>>

>>

>> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :

>> - )

>>

>>

>>

>> Thank you in advance!

>>

>> Robert




Serialization of "not a valid POJO type"

2016-07-30 Thread Paschek, Robert
Hi Mailing List,

according to my questions (and your answers!) at this topic
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Performance-issues-with-GroupBy-td8130.html

I have eliminated my ArrayList in my collect methods. Additional I want to 
emit partial results. My mapper has the following layout:

ArrayList  structure = ...

For (Tuple tuple : input) {
addTupleToStructure()
}
While(WorkNotDone) {
doSomeWorkOnStructure()
emitPartialResult();
}

Instead of emitting the partial result as an ArrayList ("not a valid POJO 
type") I do now iterate through this ArrayList and emit each Tuple as 
Tuple2.of(Integer.valueOf(this.partitionIndex), tuple)));
While iterating through this ArrayList and emitting tuples, my mapper seems to 
be blocked and can't continue to doSomeWorkOnStructure().

So I have three questions:

-  If I change back to emitting the ArrayList would my Mapper also 
be blocked until Flink has serialized this ArrayList? Or is Serialization 
done independent from my Mapper?



-  If emitting the ArrayList won't block my Mapper, which variant 
would be more performant?


-  If I emit ArrayList, but additionally implement a combiner, which

o   Merges all local ArrayLists with the same partitionIndex

o   Iterates through the local-merged ArrayLists and emits the containing 
tuples
would that be the best variant? Because the combining is done locally, I would 
assume that no Serialization is required between Mapper and Combiner. Also, the 
Mapper is probably not blocked with emitting tuples and can continue 
doSomeWorkOnStructure()

Thank you in advance!
Robert





AW: Serialization of "not a valid POJO type"

2016-07-30 Thread Paschek, Robert
Hi again,

I implemented the scenario with the combiner and answered my 3rd question by 
myself:
The combiners start after the mapper finished, so the reducer will not start 
processing partial results until the mappers are completely done.

Regards
Robert


Von: Paschek, Robert [mailto:robert.pasc...@tu-berlin.de]
Gesendet: Samstag, 30. Juli 2016 12:04
An: user@flink.apache.org
Betreff: Serialization of "not a valid POJO type"

Hi Mailing List,

according to my questions (and your answers!) at this topic
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Performance-issues-with-GroupBy-td8130.html

I have eliminated my ArrayList in my collect methods. Additional I want to 
emit partial results. My mapper has the following layout:

ArrayList  structure = ...

For (Tuple tuple : input) {
addTupleToStructure()
}
While(WorkNotDone) {
doSomeWorkOnStructure()
emitPartialResult();
}

Instead of emitting the partial result as an ArrayList ("not a valid POJO 
type") I do now iterate through this ArrayList and emit each Tuple as 
Tuple2.of(Integer.valueOf(this.partitionIndex), tuple)));
While iterating through this ArrayList and emitting tuples, my mapper seems to 
be blocked and can't continue to doSomeWorkOnStructure().

So I have three questions:

-  If I change back to emitting the ArrayList would my Mapper also 
be blocked until Flink has serialized this ArrayList? Or is Serialization 
done independent from my Mapper?



-  If emitting the ArrayList won't block my Mapper, which variant 
would be more performant?


-  If I emit ArrayList, but additionally implement a combiner, which

o   Merges all local ArrayLists with the same partitionIndex

o   Iterates through the local-merged ArrayLists and emits the containing 
tuples
would that be the best variant? Because the combining is done locally, I would 
assume that no Serialization is required between Mapper and Combiner. Also, the 
Mapper is probably not blocked with emitting tuples and can continue 
doSomeWorkOnStructure()

Thank you in advance!
Robert