Re: Deep Copy in FLINK, Kryo Copy is used in the different operator

2018-02-14 Thread Gábor Gévay
Hello,

You might also be able to make Flink use a better serializer than
Kryo. Flink falls back to Kryo when it can't use its own serializers,
see here:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/types_serialization.html
For example, it might help to make your type a POJO.

Best,
Gábor




On Wed, Feb 14, 2018 at 3:38 PM, Aljoscha Krettek  wrote:
> Hi,
>
> You can disable those copies via ExecutionConfig.enableObjectReuse(), which 
> you can get from the StreamExecutionEnvironment via getConfig().
>
> Best,
> Aljoscha
>
>> On 12. Feb 2018, at 04:00, chen  wrote:
>>
>> Actually our team have our own Stream Engine, we tested our engine and flink,
>> find out when we aggregate the stream data, the throughput is decreasing
>> very fast.
>>
>> So we catch the stack and find out a deep copy in flink.
>>
>> In different operator, there will be
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy between
>> in different operator.
>>
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: At end of complex parallel flow, how to force end step with parallel=1?

2017-10-03 Thread Gábor Gévay
Hi Garrett,

You can call .setParallelism(1) on just this operator:

ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)

Best,
Gabor



On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton  wrote:
> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction) that it would force it to a
> single thread.  Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread?  Or
> should I just split this into two completely separate jobs?  I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you


Re: Rule expression for CEP library

2017-09-25 Thread Gábor Gévay
Hello Shailesh,

There is a Flink Improvement Proposal for Integration of SQL and CEP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-20:+Integration+of+SQL+and+CEP

Best,
Gábor


On Mon, Sep 25, 2017 at 3:21 PM, Shailesh Jain
 wrote:
> Hi,
>
> Apart from the Java/Scala API for the CEP library, is there any other way to
> express patterns/rules which can be run on flink engine?
>
> Are there any plans on adding a DSL/Rule expression language for CEP anytime
> soon? If not, any pointers on how it can be achieved now would be really
> helpful.
>
> Thanks,
> Shailesh


Re: Dot notation not working for accessing case classes nested fields

2017-09-15 Thread Gábor Gévay
Hi Federico,

Sorry, nested field expressions are not supported in these methods at
the moment. I have created a JIRA issue for this:
https://issues.apache.org/jira/browse/FLINK-7629
I think this should be easy to fix, as all the infrastructure for
supporting this is already in place. I'll try to do it over the
weekend.

Best,
Gábor




On Thu, Sep 14, 2017 at 3:51 PM, Federico D'Ambrosio
 wrote:
> Hi,
>
> I have the following case classes:
>
> case class Event(instantValues: InstantValues)
> case class InstantValues(speed: Int, altitude: Int, time: DateTime)
>
>
> in a DataStream[Event] I'd like to perform a maxBy operation on the field
> time of instantValue for each event and according to the docs here it would
> be possible to use the dot notation such the following:
>
> val events = stream
>   .keyBy("otherField")
>   .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>   .maxBy("instantValues.time")
>
> positionToMaxBy - In case of a POJO, Scala case class, or Tuple type, the
> name of the public) field on which to perform the aggregation. Additionally,
> a dot can be used to drill down into nested objects, as in "field1.fieldxy"
> . Furthermore "*" can be specified in case of a basic type (which is
> considered as having only one field).
>
>
> Still, I'm getting the following error:
>
> Fields 'instantValues.time' are not valid for 'package.Event(instantValues:
> package.InstantValues(speed: Integer, altitude: Integer, time:
> GenericType))'
>
> whereas if, for instance, use only "instantValues" (while implementing its
> compareTo method) the aggregation works as usual.
>
> Any idea as to why this isn't working? Am I doing something wrong?
>
> Thanks a lot,
> Federico


Re: Bulk Iteration

2017-09-14 Thread Gábor Gévay
Hello Alieh,

If you set the logging to a more verbose level, then Flink prints a
log msg at every iteration.

If you need the current iteration number inside your code, then you
should create your UDF as an AbstractRichFunction, where you can call
getIterationRuntimeContext(), which has getSuperstepNumber().

Best,
Gabor



On Mon, Sep 11, 2017 at 2:57 PM, Alieh  wrote:
> Hello all,
>
> using Bulk iteration, is there any way to know the number of iterations?
>
>
> Cheers,
> Alieh
>


Re: DataSet: CombineHint heuristics

2017-09-05 Thread Gábor Gévay
Hi Urs,

Yes, the 1/10th ratio is just a very loose rule of thumb. I would
suggest to try both the SORT and HASH strategies with a workload that
is as similar as possible to your production workload (similar data,
similar parallelism, etc.), and see which one is faster for your
specific use case.

An important difference between the HASH and SORT strategies is that
the sorting combiner stores the original input records, while the hash
combiner stores only combined records. I.e., when an input record
arrives whose key is already in the hashtable then this record won't
consume additional memory, because it is combined right away.
Therefore, for example, if you would like your combiner to not emit
any records prematurely (i.e., combine everything possible, without
running out of memory), then with the SORT strategy you need combiner
memory proportional to your input size, while with the HASH strategy
you need combiner memory proportional only to the number of keys.

You are correct in that the performance depends very much on how many
records fit into a single Sorter/Hashtable. However, I wrote
#keys/#total records into the documentation because this is easier for
a user to estimate, and this ratio being small correlates with the
HASH strategy getting faster, as explained above.

Best,
Gábor



On Thu, Aug 31, 2017 at 4:02 PM, Aljoscha Krettek  wrote:
> Hi,
>
> I would say that your assumption is correct and that the COMBINE strategy 
> does in fact also depend on the ration " #total records/#records that fit 
> into a single Sorter/Hashtable".
>
> I'm CC'ing Fabian, just to be sure. He knows that stuff better than I do.
>
> Best,
> Aljoscha
>
>> On 31. Aug 2017, at 13:41, Urs Schoenenberger 
>>  wrote:
>>
>> Hi all,
>>
>> I was wondering about the heuristics for CombineHint:
>>
>> Flink uses SORT by default, but the doc for HASH says that we should
>> expect it to be faster if the number of keys is less than 1/10th of the
>> number of records.
>>
>> HASH should be faster if it is able to combine a lot of records, which
>> happens if multiple events for the same key are present in a data chunk
>> *that fits into a combine-hashtable* (cf handling in
>> ReduceCombineDriver.java).
>>
>> Now, if I have 10 billion events and 100 million keys, but only about 1
>> million records fit into a hashtable, the number of matches may be
>> extremely low, so very few events are getting combined (of course, this
>> is similar for SORT as the sorter's memory is bounded, too).
>>
>> Am I correct in assuming that the actual tradeoff is not only based on
>> the ratio of #total records/#keys, but also on #total records/#records
>> that fit into a single Sorter/Hashtable?
>>
>> Thanks,
>> Urs
>>
>> --
>> Urs Schönenberger - urs.schoenenber...@tngtech.com
>>
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: termination of stream#iterate on finite streams

2017-09-05 Thread Gábor Gévay
Hello,

There is a Flink Improvement Proposal to redesign the iterations:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132
This will address the termination issue.

Best,
Gábor





On Mon, Sep 4, 2017 at 11:00 AM, Xingcan Cui  wrote:
> Hi Peter,
>
> That's a good idea, but may not be applicable with an iteration operator.
> The operator can
> not determine when to generate the "end-of-stream message" for the feedback
> stream.
> The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has
> no side-effects.
>
> Best,
> Xingcan
>
>
>
> On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl  wrote:
>>
>> Hi Xingcan!
>>
>> if a _finite_ stream would, at the end, emit a special, trailing
>> "End-Of-Stream Message" that floats downward the operator stream, wouldn't
>> this enable us to deterministically end the iteration without needing a
>> timeout?
>>
>> Having an arbitrary timeout that must be longer than any iteration step
>> takes seems really awkward.
>>
>> What you think?
>>
>> Best regards
>> Peter
>>
>>
>> Am 02.09.2017 um 17:16 schrieb Xingcan Cui :
>>
>> Hi Peter,
>>
>> I just omitted the filter part. Sorry for that.
>>
>> Actually, as the javadoc explained, by default a DataStream with iteration
>> will never terminate. That's because in a
>> stream environment with iteration, the operator will never know whether
>> the feedback stream has reached its end
>> (though the data source is terminated, there may be unknowable subsequent
>> data) and that's why it needs a
>> timeout value to make the judgement, just like many other function calls
>> in network connection. In other words,
>> you know the feedback stream will be empty in the future, but the operator
>> doesn't. Thus we provide it a maximum
>> waiting time for the next record.
>>
>> Internally, this mechanism is implemented via a blocking queue (the
>> related code can be found here).
>>
>> Hope everything is considered this time : )
>>
>> Best,
>> Xingcan
>>
>> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl  wrote:
>>>
>>>
>>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui :
>>>
>>> In your codes, all the the long values will subtract 1 and be sent back
>>> to the iterate operator, endlessly.
>>>
>>>
>>>
>>> Is this true? shouldn't
>>>
>>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump
>>> meaningless 'y' chars just to do anything
>>>   })
>>>   iterationResult2.print()
>>>
>>>
>>> produce the following _feedback_ streams?
>>>
>>> initial input to #iterate(): [1 2 3 4]
>>>
>>> iteration #1 : [1 2 3]
>>> iteration #2 : [1 2]
>>> iteration #3 : [1]
>>> iteration #4 : []  => empty feedback stream => cause termination? (which
>>> actually only happens when setting a timeout value)
>>>
>>> Best regards
>>> Peter
>>
>>
>>
>> Am 02.09.2017 um 17:16 schrieb Xingcan Cui :
>>
>> Hi Peter,
>>
>> I just omitted the filter part. Sorry for that.
>>
>> Actually, as the javadoc explained, by default a DataStream with iteration
>> will never terminate. That's because in a
>> stream environment with iteration, the operator will never know whether
>> the feedback stream has reached its end
>> (though the data source is terminated, there may be unknowable subsequent
>> data) and that's why it needs a
>> timeout value to make the judgement, just like many other function calls
>> in network connection. In other words,
>> you know the feedback stream will be empty in the future, but the operator
>> doesn't. Thus we provide it a maximum
>> waiting time for the next record.
>>
>> Internally, this mechanism is implemented via a blocking queue (the
>> related code can be found here).
>>
>> Hope everything is considered this time : )
>>
>> Best,
>> Xingcan
>>
>> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl  wrote:
>>>
>>>
>>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui :
>>>
>>> In your codes, all the the long values will subtract 1 and be sent back
>>> to the iterate operator, endlessly.
>>>
>>>
>>>
>>> Is this true? shouldn't
>>>
>>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump
>>> meaningless 'y' chars just to do anything
>>>   })
>>>   iterationResult2.print()
>>>
>>>
>>> produce the following _feedback_ streams?
>>>
>>> initial input to #iterate(): [1 2 3 4]
>>>
>>> iteration #1 : [1 2 3]
>>> iteration #2 : [1 2]
>>> iteration #3 : [1]
>>> iteration #4 : []  => empty feedback stream => cause termination? (which
>>> actually only happens when setting a timeout value)
>>>
>>> Best regards
>>> Peter
>>>
>>>
>>
>>
>


Re: Flink Vs Google Cloud Dataflow?

2017-08-04 Thread Gábor Gévay
Hello,

Have you seen these two blog posts? They explain the relationship
between Apache Flink, Apache Beam, and Google Cloud Dataflow.
https://data-artisans.com/blog/why-apache-beam
https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective

Best,
Gábor




On Mon, Jul 31, 2017 at 6:36 AM, Sridhar Chellappa  wrote:
> Did anyone study Cloud DataFlow as an alternative to Flink? If yes, can
> someone summarize their analysis of Dataflow as against Flink?


Re: Beginner question - sum multiple edges

2017-04-23 Thread Gábor Gévay
Hi Marc,

You can do such a map on your edges dataset that switches the
direction of edges if the source id is bigger than the target id. So
your MapFunction will have an if, which checks whether the source id
is bigger than the target id, and if it is bigger, than returns the
edge reversed, else returns the edge unchanged.

So the result of this on your example would be:
1 2 10
1 2 10
1 2 10  // This is the switched edge
1 1 1
1 3 5

You can remove loops with a filter, after which you get:
1 2 10
1 2 10
1 2 10
1 3 5

And after this, my original solution (.groupBy(0, 1).sum(2)) should
work, and give you:
1 2 30
1 3 5

Note, that my above solution is not using Gelly, so you don't need
parameters.setDirection(EdgeDirection.ALL).
I'm not sure whether there is a more simple solution that uses Gelly.

Best,
Gábor




On Sun, Apr 23, 2017 at 4:56 PM, Kaepke, Marc
<marc.kae...@haw-hamburg.de> wrote:
> Hi Gábor and anyone else ;-) ,
>
> I need your help again.
>
> My goal is a graph without self-loops and sum all edge values between two
> vertices into one single edge (both direction).
> e.g. my input graph is described by:
> 1 2 10
> 1 2 10
> 2 1 10
> 1 1 1
> 1 3 5
>
> The result has to be:
> 1 2 30 (sum and reduce all edges between vertex 1 and 2. I don’t care about
> the direction of these vertices because of the
> setDirection(EdgeDirection.ALL) parameter)
> 1 3 5
>
>
> My intermediate result is:
> 1 2 20
> 2 1 10
> 1 3 5
> with the following transformations:
>
> ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
> parameters.setDirection(EdgeDirection.ALL);
>
> // reduce multi edges
> Graph networkSumMultiEdges =
> Graph.fromTupleDataSet(inputGraph.getEdges().groupBy(0, 1).sum(2),
> ExecutionEnvironment.getExecutionEnvironment());
> // reduce self-looping
> Graph networkGraph = new Simplify<>().run(networkSumMultiEdges);
>
>
> How can I reduce and combine (1 2 20) and (2 1 10) to one Tuple?
>
>
> Best regards
> Marc
>
>
> Am 17.04.2017 um 21:47 schrieb Kaepke, Marc <marc.kae...@haw-hamburg.de>:
>
> Hi Gábor,
>
> thanks a lot
>
> Best,
> Marc
>
> Am 17.04.2017 um 20:32 schrieb Gábor Gévay <gga...@gmail.com>:
>
> Hello Marc,
>
> You can group by edge, and then sum:
>
> edges
> .groupBy(0,1) // make first two fields a composite key
> .sum(2); // sum the value field
>
> This will turn multiple edges that have the same source and target
> into one edge, whose value will be the sum of the values of the
> original group of edges.
>
> Best,
> Gabor
>
> PS.: Sorry for the duplicate email, I accidentally sent my previous
> email only to you instead of the mailing list.
>
>
>
>
> On Mon, Apr 17, 2017 at 5:46 PM, Kaepke, Marc
> <marc.kae...@haw-hamburg.de> wrote:
>
> Hi,
>
> how can I sum and reduce multiple edges in my entire graph?
>
> e.g. my input graph looks like (source-ID, target-ID, value):
> (1, 2, 30)
> (1, 2, 10)
> (2, 1, 55)
>
> And I need:
> (1, 2, 40)
> (2, 1, 55)
>
>
> Thanks!
> Marc
>
>
>


Re: Join with Default-Value

2017-02-10 Thread Gábor Gévay
I'm not sure what exactly is the problem, but could you check this FAQ item?
http://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-

Best,
Gábor



2017-02-10 14:16 GMT+01:00 Sebastian Neef :
> Hi,
>
> thanks! That's exactly what I needed.
>
> I'm not using: DataSetA.leftOuterJoin(DataSetB).where(new
> KeySelector()).equalTo(new KeySelector()).with(new JoinFunction(...)).
>
> Now I get the following error:
>
>> Caused by: org.apache.flink.optimizer.CompilerException: Error translating 
>> node 'Map "Key Extractor" : MAP [[ GlobalProperties 
>> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, 
>> grouped=null, unique=null] ]]': Could not write the user code wrapper class 
>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
>> java.io.NotSerializableException: 
>> org.apache.flink.api.java.operators.JoinOperator$EquiJoin
>
> I'm using flink-1.1.4-hd27.
>
> Any ideas how I can fix that bug? It did properly work with a simple .join()
>
> Regards,
> Sebastian


Re: Join with Default-Value

2017-02-10 Thread Gábor Gévay
Hello Sebastian,

You can use DataSet.leftOuterJoin for this.

Best,
Gábor




2017-02-10 12:58 GMT+01:00 Sebastian Neef :
> Hi,
>
> is it possible to assign a "default" value to elements that didn't match?
>
> For example I have the following two datasets:
>
> |DataSetA | DataSetB|
> -
> |id=1 | id=1
> |id=2 | id=3
> |id=5 | id=4
> |id=6 | id=6
>
> When doing a join with:
>
> A.join(B).where( KeySelector(A.id))
> .equalTo(KeySelector(B.id))
>
> The resulting dataset is:
>
> |(DataSetA | DataSetB)|
> -
> |(id=1| id=1)
> |(id=6| id=6)
>
> What is the best way to assign a default value to the elements id=2/id=5
> from DataSet A. E.g. I need a result which looks similar to this:
>
> |(DataSetA | DataSetB)|
> -
> |(id=1| id=1)
> |(id=2| Default)
> |(id=5| Default)
> |(id=6| id=6)
>
> My idea would be to get the missing Elements from DataSetA by .filter
> with (DataSetA|DataSetB) and then do a .union after creating a tuple
> with a default value. But that sounds a bit over-complicated.
>
> Best regards,
> Sebastian


Re: Cyclic ConnectedStream

2017-01-31 Thread Gábor Gévay
I somehow still suspect that iterations might work for your use case. Note,
that in the streaming API, iterations are currently nothing more than a
back-edge in the topology, i.e. a low-level tool to create a cyclic
topology, like as you say with your hypothetical setter syntax. (It's quite
different from the iterations of the batch API.)

The tricky part for your use-case is that you would want a ConnectedStream
as your iteration head, which should get the elements from the back-edge in
a separated way from the normal input. You could simulate this by using not
ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input
element type is an Either type, whose two components would be the normal
input and the back-edge input. (And you add maps before the closeWith and
to your input1, which would appropriately wrap into the two alternatives of
the Either type.)

Best,
Gábor



2017-01-29 15:39 GMT+01:00 Matt <dromitl...@gmail.com>:

> Check this image for clarification, this is what I'm trying to do:
> http://i.imgur.com/iZxPv04.png
>
> [image: Inline image 1]
>
> The rectangles are the two CoFlatMapFunction, sharing a state between
> process and update (map1 and map2). It's clear from the image that I need
> input1 and the green box to create the blue box, and input2 and the blue
> box to create the green one.
>
> ---
> *blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
> ---
>
> As you can see there's no cycle in the flow of data so I guess this
> topology is valid. The problem is not having a way to define such flow.
>
> For instance, with the appropriate setters we would be able to do this:
>
> ---
> *blue*  = *input1*.connect();
> *green* = *input2*.connect();
>
> *blue.*setConnection(*green*);
> *green*.setConnection(*blue*);
>
> *blue*.keyBy(...).flatMap(...);
> *green*.keyBy(...).flatMap(...);
> ---
>
> Any idea is welcome.
>
> Matt
>
> On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote:
>
>> I'm aware of IterativeStream but I don't think it's useful in this case.
>>
>> As shown in the example above, my use case is "cyclic" in that the same
>> object goes from *Input* to *predictionStream* (flatMap1), then to
>> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
>> and finally to *predictionStream* (flatMap2).
>>
>> The same operator is never applied twice to the object, thus I would say
>> this dataflow is cyclic only in the dependencies of the stream
>> (predictionStream depends on statsStream, but it depends on
>> predictionStream in the first place).
>>
>> I hope it is clear now.
>>
>> Matt
>>
>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Cyclic dataflows can be built using iterations:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>> dev/datastream_api.html#iterations
>>>
>>> Best,
>>> Gábor
>>>
>>>
>>>
>>>
>>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>:
>>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>>> (B),
>>> > which depends on the first one (A).
>>> >
>>> > Simplified code:
>>> >
>>> > predictionStream = input
>>> >   .connect(statsStream)
>>> >   .keyBy(...)
>>> >   .flatMap(CoFlatMapFunction {
>>> >  flatMap1(obj, output) {
>>> >  p = prediction(obj)
>>> >  output.collect(p)
>>> >  }
>>> >  flatMap2(stat, output) {
>>> >  updateModel(stat)
>>> >  }
>>> >   })
>>> >
>>> > statsStream = input2
>>> >   .connect(predictionStream)
>>> >   .keyBy(...)
>>> >   .flatMap(CoFlatMapFunction {
>>> >  flatMap1(obj2, output) {
>>> > s = getStats(obj2, p)
>>> > output.collect(s)
>>> >  }
>>> >  flatMap2(prediction, output) {
>>> > p = prediction
>>> >  }
>>> >   })
>>> >
>>> > I'm guessing this should be possible to achieve, one way would be to
>>> add a
>>> > sink on statsStream to save the elements into Kafka and read from that
>>> topic
>>> > on predictionStream instead of initializing it with a reference of
>>> > statsStream. But I would rather avoid writing unnecessarily into kafka.
>>> >
>>> > Is there any other way to achieve this?
>>> >
>>> > Thanks,
>>> > Matt
>>>
>>
>>
>


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Gábor Gévay
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske :
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>> return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>


Re: Equivalent of Rx combineLatest() on a join?

2016-12-13 Thread Gábor Gévay
Dear Denis,

I think you can do it with a simple CoFlatMapFunction (without windows):
To use a CoFlatMapFunction, you need to first connect [1] your
streams, which results in a ConnectedStreams. Then you can call
flatMap on this, and give a CoFlatMapFunction to it (where two
different callbacks are executed when an element arrives on one of the
two streams). What you could do, is to have two members in your
CoFlatMapFunction that store the latest values from the two streams,
and you update the appropriate one when an element arrives and also
emit a combined value from them.

Best,
Gábor

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#connect-org.apache.flink.streaming.api.datastream.DataStream-




2016-12-05 18:28 GMT+01:00  :
> Actually that doesn’t work as expected because emitted values are not
> purged. I’ll experiment with purging triggers and/or evictors, though I have
> the feeling that Flink was not designed for what we need to do here -- but
> I’ll keep on searching.
>
>
>
> In the meantime any advice is appreciated. If the goal is not clear I can
> provide more details.
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 16:31
> To: user@flink.apache.org
> Subject: RE: Equivalent of Rx combineLatest() on a join?
>
>
>
> Asking the response helped me to find the answer (yes, rubber duck
> debugging) as it seems that the code below does what I need:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
> .window(GlobalWindow.create())
>
> .trigger(CountTrigger.of(1))
>
> .apply(new JoinFunction);
>
>
>
> If that’s a common use case (in my view it is), a syntax shortcut could help
> developers, e.g. something like:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
> .combineLatest(new JoinFunction);
>
>
>
> Denis
>
>
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 12:27
> To: user@flink.apache.org
> Subject: Equivalent of Rx combineLatest() on a join?
>
>
>
> Hi all,
>
>
>
> [first email here, I’m new to Flink, Java and Scala, sorry if I missed
> something obvious]
>
>
>
> I'm exploring Flink in the context of streaming calculators. Basically, the
> data flow boils down to multiple data streams with variable update rates
> (ms, seconds, …, month) which are joined before being fed to calculators.
> The kind of operation I need is very similar to the Rx combineLatest
> operator, which results in a object being emitted whenever one of the
> streams is updated.
>
>
>
> As there is no such operator predefined, I think I have to use a
> GlobalWindow and provide a custom WindowAssigner. The end result would look
> like this (pseudo java 8 code, I hope it's understandable):
>
>
>
> DataStream s1 = env.addSource(..);
>
> DataStream s2 = env.addSource(..);
>
>
>
> S3 = s1.join(s2)
>
> .where(s1 -> id)
>
> .equalTo(s2 -> id)
>
> .window(new MyCustomCombineLatestAssigner())
>
> .apply( … return new object combining data from s1 and from
> s2);
>
>
>
> Is the approach correct, or is there a simpler way to achieve the same join
> + apply mechanism ?
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
>
>
>
>
> 
>
>
> This e-mail is for the sole use of the intended recipient and contains
> information that may be privileged and/or confidential. If you are not an
> intended recipient, please notify the sender by return e-mail and delete
> this e-mail and any attachments. Certain required legal entity disclosures
> can be accessed on our website.


Re: spark vs flink batch performance

2016-11-18 Thread Gábor Gévay
> "For csv reading, i deliberately did not use csv reader since i want to run
> same code across spark and flink."
>
> If your objective deviates from writing and running the fastest Spark and
> fastest Flink programs, then your comparison is worthless.

Well, I don't really agree with this. I would say that it can actually
be a valid objective to compare different systems in a way that we
don't tune the code very much to the individual systems. This is
because I guess it also happens sometimes in real (non-benchmark)
jobs, that we don't want to spend too much time on tuning.

However, in this case I also think that using the built-in CSV reader
method would not constitute as "too much tuning to a specific system".
So I would do this comparison with using the built-in CSV reader in
both systems.

Best,
Gábor



2016-11-18 15:30 GMT+01:00 Greg Hogan <c...@greghogan.com>:
> "For csv reading, i deliberately did not use csv reader since i want to run
> same code across spark and flink."
>
> If your objective deviates from writing and running the fastest Spark and
> fastest Flink programs, then your comparison is worthless.
>
>
>
> On Fri, Nov 18, 2016 at 5:37 AM, CPC <acha...@gmail.com> wrote:
>>
>> Hi Gabor,
>>
>> Thank you for your kind response. I forget to mention that i have actually
>> three workers. This is why i set default paralelism to 6.
>>
>> For csv reading, i deliberately did not use csv reader since i want to run
>> same code across spark and flink. Collect is returning 40k records which is
>> not so big.
>>
>> I will try same test with spark 1.5 and 1.6 as well to understand whether
>> spark 2.x series has some performance improvements because in those kind of
>> tests, spark and flink was either on par or flink 10-15% faster than spark
>> in the past. Aside from that are any configuration parameters you may
>> propose to fine tune flink?
>>
>> Best,
>> Anıl
>>
>> On Nov 18, 2016 12:25, "Gábor Gévay" <gga...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> Your program looks mostly fine, but there are a few minor things that
>>> might help a bit:
>>>
>>> Parallelism: In your attached flink-conf.yaml, you have 2 task slots
>>> per task manager, and if you have 1 task manager, then your total
>>> number of task slots is also 2. However, your default parallelism is
>>> 6. In Flink, the recommended default parallelism is exactly the total
>>> number of task slots [1]. (This is in contrast to Spark, where the
>>> recommended setting is 2-3 per CPU core [2].)
>>>
>>> CSV reading: If your input is a CSV file, then you should use
>>> readCsvFile (instead of readTextFile and then parsing it manually).
>>>
>>> Collect call: How large is the DataSet that you are using collect on?
>>> If it is large, then we might try to figure out a way to get the top
>>> 10 elements without first collecting the DataSet.
>>>
>>> Best,
>>> Gábor
>>>
>>> [1]
>>> https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
>>> [2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism
>>>
>>>
>>>
>>>
>>>
>>> 2016-11-16 22:38 GMT+01:00 CPC <acha...@gmail.com>:
>>> > Hi all,
>>> >
>>> > I am trying to compare spark and flink batch performance. In my test i
>>> > am
>>> > using ratings.csv in
>>> > http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I
>>> > also
>>> > concatenated ratings.csv 16 times to increase dataset size(total of
>>> > 390465536 records almost 10gb).I am reading from google storage with
>>> > gcs-connector and  file schema is : userId,movieId,rating,timestamp.
>>> > Basically i am calculating average rating per movie
>>> >
>>> > Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>> >>
>>> >> case class Rating(userID: String, movieID: String, rating: Double,
>>> >> date:
>>> >> Timestamp)
>>> >
>>> >
>>> >>
>>> >> def parseRating(line: String): Rating = {
>>> >>   val arr = line.split(",")
>>> >>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong
>>> >> *
>>> >> 1000)))
>>> >> }
>>> >
>>> >
>>> >>
>>> >> val ratings: DataS

Re: spark vs flink batch performance

2016-11-18 Thread Gábor Gévay
Hello,

Your program looks mostly fine, but there are a few minor things that
might help a bit:

Parallelism: In your attached flink-conf.yaml, you have 2 task slots
per task manager, and if you have 1 task manager, then your total
number of task slots is also 2. However, your default parallelism is
6. In Flink, the recommended default parallelism is exactly the total
number of task slots [1]. (This is in contrast to Spark, where the
recommended setting is 2-3 per CPU core [2].)

CSV reading: If your input is a CSV file, then you should use
readCsvFile (instead of readTextFile and then parsing it manually).

Collect call: How large is the DataSet that you are using collect on?
If it is large, then we might try to figure out a way to get the top
10 elements without first collecting the DataSet.

Best,
Gábor

[1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
[2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism





2016-11-16 22:38 GMT+01:00 CPC :
> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in
> http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also
> concatenated ratings.csv 16 times to increase dataset size(total of
> 390465536 records almost 10gb).I am reading from google storage with
> gcs-connector and  file schema is : userId,movieId,rating,timestamp.
> Basically i am calculating average rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>
>
>>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>>
>> val ratings: DataSet[Rating] =
>> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>> parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 /
>> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] =
>> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>> => (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 /
>> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...


Re: Retrieving values from a dataset of datasets

2016-11-16 Thread Gábor Gévay
The short answer is that because DataSet is not serializable.

I think the main underlying problem is that Flink needs to see all
DataSet operations before launching the job. However, if you have a
DataSet, then operations on the inner DataSets will end up
being specified inside the UDFs of operations on the outer DataSet.
This is a problem, because Flink cannot see inside the UDFs before the
job starts, since they get executed only after the job starts
executing.

There are some workarounds though:

1. If you know that your inner DataSets would be small, then you can
instead replace them with some regular Java/Scala collection class,
like an Array or List.

2. You can often flatten your data, that is, somehow represent your
nested collection with a flat collection. Exactly how to do this
depends on your use case. For example, suppose that originally we
wanted to represent the lengths of the shortest paths between all
pairs of vertices in a graph by a DataSet that for every vertex
contains a DataSet that tells us the distances to all the other
Vertices:
DataSet>
which is a DataSet that contains pairs of vertices and their distances.

Btw. [1] is a paper where some graph data structures having complex
nesting are represented in Flink.

Best,
Gábor

[1] http://dbs.uni-leipzig.de/file/EPGM.pdf





2016-11-15 17:37 GMT+01:00 otherwise777 :
> It seems what i tried did indeed not work.
> Can you explain me why that doesn't work though?
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Retrieving-values-from-a-dataset-of-datasets-tp10108p10128.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: Retrieving values from a dataset of datasets

2016-11-15 Thread Gábor Gévay
Hello,

How exactly do you represent the DataSet of DataSets? I'm asking
because if you have something like a
DataSet
that unfortunately doesn't work in Flink.

Best,
Gábor






2016-11-14 20:44 GMT+01:00 otherwise777 :
> Hey There,
>
> I'm trying to calculate the betweenness in a graph with Flink and Gelly, the
> way I tried this was by calculating the shortest path from every node to the
> rest of the nodes. This results in a Dataset of vertices which all have
> Datasets of their own with all the other vertices and their paths.
>
> Next i used the Reduce function on the inner DataSets so every inner DataSet
> has 1 value.
>
> Now I have a DataSet of DataSets with 1 value each, but how do i efficiently
> transform this into a a single DataSet with values? I can do a mapping on
> the DataSet and use collect(), but i think that would be very costly
>
>
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Retrieving-values-from-a-dataset-of-datasets-tp10108.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: Looping over a DataSet and accesing another DataSet

2016-10-30 Thread Gábor Gévay
Hello,

In Flink, one often used way to access data from multiple DataSets at
the same time is to perform a join (Flink actually calls equi-joins
[1] just "join"), just as in the database world.

For example, in the algorithm that you linked, you access A[u] for
every edge (u,v). I assume that you have stored A in a DataSet of
(index, value) pairs. You can achieve this access pattern by
performing a join, and in the join condition you specify that the
first endpoint of the edge should be equal to the index of A. This
way, you get a DataSet where every record contains an edge (u,v) and
also A[u], so you can do a map on this where the UDF of your map will
get (u,v) and A[u].

Your algorithm also accesses A[v], which can be achieved by performing
a second join that is similar to the first (using the result of the
first).

However, the updating of P will be more tricky to translate to Flink.
I'm not sure I undersand the linked algorithm correctly: does every
element of P contain a list, and the + means appending an element to a
list? (in the line P[v] = P[u] + v)

Best,
Gábor

[1] https://en.wikipedia.org/wiki/Join_(SQL)#Equi-join



2016-10-30 8:25 GMT+01:00 otherwise777 :
> Currently i'm trying to implement this algorithm [1] which requires me to
> loop over one DataSet (the edges) and access another DataSet (the vertices),
> for this loop i use a Mapping (i'm not sure if this is the correct way of
> looping over a DataSet) but i don't know how to access the elements of
> another DataSet while i'm looping over one.
>
> I know Gelly also has iterative support for these kind of things, but they
> loop over the Vertices and not the Edges
>
> [1] http://prntscr.com/d0qeyd
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-DataSet-and-accesing-another-DataSet-tp9778.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: Flink strange stream join behavior

2016-10-16 Thread Gábor Gévay
Hello,

For your first question:

> the number of tuples are same in both cases

I guess you mean the total number of tuples here, right? So this means
that you have fewer, but larger windows. Suppose that you have W
windows, each with S tuples. Then your total input has W * S tuples,
and your total output has W * S * S tuples (assuming that all pairs of
tuples from the two matching windows are matched in the join, since
then the join of the matching windows squares the number of tuples in
the window).

Now if you multiply the window size by r, but also divide the number
of windows by r (so that the total number of input tuples stays the
same), then you have W/r * S*r * S*r tuples, which simplified is
W*S*S*r, so the total number of output tuples gets multiplied by r,
and this is indeed close to the numbers that you reported: In your
second case, you have about 40 times as much data in a window (I
rounded your 37), and 40 times the output. (r = 40)

Another way to imagine the situation is that when you chunk your data
to larger windows, then more tuples will "meet" in the joins, since
only those tuples are matched in the join that are in the same
windows.

I hope this helps, and I didn't misunderstand your situation.

Best,
Gábor



2016-10-15 23:28 GMT+02:00 Davood Rafiei :
> Hi,
>
> I am experiencing strange flink stream windowed join behavior.
>
> I want to do windowed (processing time) join between two partitioned
> streams. I read data from socket.
>  I have two cases: 1. data speed in socket is relatively slow (say 1K ps) 2.
> data speed in socket is high (say 37K).
> The number of tuples read from socket is same in both cases to both cases.
>
> Firstly, the size of output (of join operation) is much higher in case 2
> although the number of tuples are same in both cases. For example, in
> case-1, the overall output size is 500M and in case 2 it is 20G. I couldn't
> get the logic behind this.
>
> Secondly, in both cases, flink ingests all data from socket (more or less)
> as soon as it is available. So, it has high throughput. However, especially
> in case 2, I have to wait long time after data is ingested from source
> operator. So, the data from socket is acquired and socket gets idle, and
> then I have to wait long time to get actual output to sink. My question is
> that, if this behavior is normal, and all the data acquired stays somewhere
> inside flink, why backpressure is not applied to source operators? I mean if
> the system cannot compute all inputs with high speed, then it should lower
> the reading speed from socket.
>
> Thanks
> Davood


Re: Nested iterations

2016-09-01 Thread Gábor Gévay
I don't think that there are plans for enabling the nesting of the
native iteration constructs, but we should wait for one of the
commiters to confirm this.

However, the matter of caching of intermediate results has came up on
numerous occasions before [1,2,3,4,5], and it would be useful in lots
of other situations as well, so there is hope that it will be
implemented some day, which would make the 1. workaround from above
more feasible.

Best,
Gábor

[1] https://issues.apache.org/jira/browse/FLINK-1730
[2] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Iteration-Intermediate-Output-td11850.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Questions-re-ExecutionGraph-amp-ResultPartitions-for-interactive-use-a-la-Spark-td4154.html
[4] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-programm-with-for-loop-yields-wrong-results-when-run-in-parallel-td7783.html
[5] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterative-queries-on-Flink-td3786.html




2016-09-01 18:31 GMT+02:00 Supun Kamburugamuve <supu...@gmail.com>:
> Thanks Gabor. I was thinking about starting separate jobs.
>
> Is there any plans to support nested loops in the future?
>
> Thanks,
> Supun..
>
> On Thu, Sep 1, 2016 at 12:28 PM, Gábor Gévay <gga...@gmail.com> wrote:
>>
>> Hello Supun,
>>
>> Unfortunately, nesting of Flink's iteration constructs are not
>> supported at the moment.
>>
>> There are some workarounds though:
>>
>> 1. You can start a Flink job for each step of the iteration. Starting
>> a Flink job has some overhead, so this only works if there is a
>> sufficient amount of work in each iteration step. Moreover, this has
>> the disadvantage that the intermediate results are always need to be
>> written out and then read back between steps, which might have a
>> considerable performance impact.
>>
>> 2. If you have just a small fixed number of steps, then you can have a
>> for loop that "unrolls" all the iteration steps, and creates one large
>> Flink job. The code will be somewhat similar to the first approach,
>> but you don't call execute between the steps, and you don't write
>> intermediate results to a sink, but just use the DataSet from the
>> previous step. The disadvantage of this is that you might end up with
>> a too large Flink job, which might also hurt performance.
>>
>> Best,
>> Gábor
>>
>>
>>
>>
>>
>>
>> 2016-09-01 18:09 GMT+02:00 Supun Kamburugamuve <supu...@gmail.com>:
>> > Hi,
>> >
>> > Does Flink support nested iterations? We are trying to develop a complex
>> > machine learning algorithm which has 3 iterations nested.
>> >
>> > Best,
>> > Supun..
>> >
>> >
>
>
>
>
> --
> Supun Kamburugamuve
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: su...@apache.org;  Mobile: +1 812 219 2563
>
>


Re: Nested iterations

2016-09-01 Thread Gábor Gévay
Hello Supun,

Unfortunately, nesting of Flink's iteration constructs are not
supported at the moment.

There are some workarounds though:

1. You can start a Flink job for each step of the iteration. Starting
a Flink job has some overhead, so this only works if there is a
sufficient amount of work in each iteration step. Moreover, this has
the disadvantage that the intermediate results are always need to be
written out and then read back between steps, which might have a
considerable performance impact.

2. If you have just a small fixed number of steps, then you can have a
for loop that "unrolls" all the iteration steps, and creates one large
Flink job. The code will be somewhat similar to the first approach,
but you don't call execute between the steps, and you don't write
intermediate results to a sink, but just use the DataSet from the
previous step. The disadvantage of this is that you might end up with
a too large Flink job, which might also hurt performance.

Best,
Gábor






2016-09-01 18:09 GMT+02:00 Supun Kamburugamuve :
> Hi,
>
> Does Flink support nested iterations? We are trying to develop a complex
> machine learning algorithm which has 3 iterations nested.
>
> Best,
> Supun..
>
>


Re: Performance issues with GroupBy?

2016-07-26 Thread Gábor Gévay
Hello Robert,

> Is there something I might could do to optimize the grouping?

You can try to make your `RichGroupReduceFunction` implement the
`GroupCombineFunction` interface, so that Flink can do combining
before the shuffle, which might significantly reduce the network load.
(How much the combiner helps the performance can greatly depend on how
large are your groups on average.)

Alternatively, if you can reformulate your algorithm to use a `reduce`
instead of a `reduceGroup` that might also improve the performance.
Also, if you are using a `reduce`, then you can try calling
`.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
hint is a relatively new feature, so you need the current master for
this.)

Best,
Gábor



2016-07-25 14:06 GMT+02:00 Paschek, Robert :
> Hi Mailing List,
>
>
>
> i actually do some benchmarks with different algorithms. The System has 8
> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
> MapReduce, the execution mode is set to “BATCH_FORCED”
>
>
>
> It is suspicious, that three of the six algorithms had a big gap in runtime
> (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the algorithms
> in the “upper” group using a groupBy transformation and the algorithms in
> the “lower” group don’t use groupBy.
>
> I attached the plot for better visualization.
>
>
>
> I also checked the logs, especially the time, when the mappers finishing and
> the reducers start _iterating_ - they hardened my speculation.
>
>
>
> So my question is, if it is “normal”, that grouping is so cost-intensive
> that – in my case – the runtime increases by 4 times?
>
> I have data from the same experiments running on a 13 nodes cluster with 26
> cores with Apache Hadoop MapReduce, where the gap is still present, but
> smaller (50s vs 57s or 55s vs 65s).
>
>
>
> Is there something I might could do to optimize the grouping? Some
> codesnipplets:
>
>
>
> The Job:
> DataSet output = input
>
> .mapPartition(new
> MR_GPMRS_Mapper()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")
>
> .groupBy(0)
>
> .reduceGroup(new
> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");
>
>
>
> MR_GPMRS_Mapper():
>
> public class MR_GPMRS_Mapper  extends
> RichMapPartitionFunction,
> BitSet, BitSet>>>
>
>
>
> MR_GPMRS_Reducer():
>
> public class MR_GPMRS_Reducer  extends
> RichGroupReduceFunction,
> BitSet, BitSet>>, T>
>
>
>
> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
> Integer Key for grouping.
>
>
>
> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :
> - )
>
>
>
> Thank you in advance!
>
> Robert


Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
Ah, sorry, you are right. You could also call keyBy again before the
second sum, but maybe someone else has a better idea.

Best,
Gábor



2016-06-07 16:18 GMT+02:00 Al-Isawi Rami <rami.al-is...@comptel.com>:
> Thanks Gábor, but the first sum call will return
>
> SingleOutputStreamOperator
>
> I could not do another sum call on that. Would tell me how did you manage to
> do
>
> stream.sum().sum()
>
> Regards,
> -Rami
>
> On 7 Jun 2016, at 16:13, Gábor Gévay <gga...@gmail.com> wrote:
>
> Hello,
>
> In the case of "sum", you can just specify them one after the other, like:
>
> stream.sum(1).sum(2)
>
> This works, because summing the two fields are independent. However,
> in the case of "keyBy", the information is needed from both fields at
> the same time to produce the key.
>
> Best,
> Gábor
>
>
>
> 2016-06-07 14:41 GMT+02:00 Al-Isawi Rami <rami.al-is...@comptel.com>:
>
> Hi,
>
> Is there any reason why “keyBy" accepts multi-field, while for example “sum”
> does not.
>
> -Rami
> Disclaimer: This message and any attachments thereto are intended solely for
> the addressed recipient(s) and may contain confidential information. If you
> are not the intended recipient, please notify the sender by reply e-mail and
> delete the e-mail (including any attachments thereto) without producing,
> distributing or retaining any copies thereof. Any review, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient(s) is prohibited.
> Thank you.
>
>
> Disclaimer: This message and any attachments thereto are intended solely for
> the addressed recipient(s) and may contain confidential information. If you
> are not the intended recipient, please notify the sender by reply e-mail and
> delete the e-mail (including any attachments thereto) without producing,
> distributing or retaining any copies thereof. Any review, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient(s) is prohibited.
> Thank you.


Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
Hello,

In the case of "sum", you can just specify them one after the other, like:

stream.sum(1).sum(2)

This works, because summing the two fields are independent. However,
in the case of "keyBy", the information is needed from both fields at
the same time to produce the key.

Best,
Gábor



2016-06-07 14:41 GMT+02:00 Al-Isawi Rami :
> Hi,
>
> Is there any reason why “keyBy" accepts multi-field, while for example “sum” 
> does not.
>
> -Rami
> Disclaimer: This message and any attachments thereto are intended solely for 
> the addressed recipient(s) and may contain confidential information. If you 
> are not the intended recipient, please notify the sender by reply e-mail and 
> delete the e-mail (including any attachments thereto) without producing, 
> distributing or retaining any copies thereof. Any review, dissemination or 
> other use of, or taking of any action in reliance upon, this information by 
> persons or entities other than the intended recipient(s) is prohibited. Thank 
> you.


Re: time spent for iteration

2016-03-09 Thread Gábor Gévay
Yes, I also think that it would be a nice feature. It would make the
advantage of delta iterations (that later iterations take less time)
more visible to the users.

Best,
Gábor


2016-03-09 15:25 GMT+01:00 Vasiliki Kalavri <vasilikikala...@gmail.com>:
> I think it would be useful to allow for easier retrieval of this
> information.
> Wouldn't it make sense to expose this to the web UI for example?
> We actually had a discussion about this some time ago [1].
>
> -Vasia.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-1759
>
> On 9 March 2016 at 14:37, Gábor Gévay <gga...@gmail.com> wrote:
>>
>> Hello,
>>
>> If you increase the log level, you can see each step of the iteration
>> separately in the log, with timestamps.
>>
>> Best,
>> Gábor
>>
>>
>>
>>
>>
>> 2016-03-09 14:04 GMT+01:00 Riccardo Diomedi
>> <riccardo.diomed...@gmail.com>:
>> > Is it possible to add timer for the time spent for iteration when
>> > iterate operator or the delta iterate operator is performed?
>> >
>> > thanks
>> >
>> > Riccardo
>
>


Re: time spent for iteration

2016-03-09 Thread Gábor Gévay
Hello,

If you increase the log level, you can see each step of the iteration
separately in the log, with timestamps.

Best,
Gábor





2016-03-09 14:04 GMT+01:00 Riccardo Diomedi :
> Is it possible to add timer for the time spent for iteration when iterate 
> operator or the delta iterate operator is performed?
>
> thanks
>
> Riccardo


Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-24 Thread Gábor Gévay
Hello,

> // For each "filename" in list do...
> DataSet featureList = fileList
> .flatMap(new ReadDataSetFromFile()) // flatMap because there
> might multiple DataSets in a file

What happens if you just insert .rebalance() before the flatMap?

> This kind of DataSource will only be executed
> with a degree of parallelism of 1. The source will send it’s collection
> elements in a round robin fashion to the downstream operators which are
> executed with a higher parallelism. So when Flink schedules the downstream
> operators, it will try to place them close to their inputs. Since all flat
> map operators have the single data source task as an input, they will be
> deployed on the same machine if possible.

Sorry, I'm a little confused here. Do you mean that the flatMap will
have a high parallelism, but all instances on a single machine?
Because I tried to reproduce the situation where I have a non-parallel
data source and then a flatMap, and the plan shows that the flatMap
actually has parallelism 1, which would be an alternative explanation
to the original problem that it gets executed on a single machine.
Then, if I insert .rebalance() after the source, then a "Partition"
operation appears between the source and the flatMap, and the flatMap
has a high parallelism. I think this should also solve the problem,
without having to write a parallel data source.

Best,
Gábor


Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
Hello,

> I think that there is actually a fundamental latency issue with
> "exactly once sinks", no matter how you implement them in any systems:
> You can only commit once you are sure that everything went well,
> to a specific point where you are sure no replay will ever be needed.

What if the persistent buffer in the sink would be used to determine
which data elements should be emitted in case of a replay? I mean, the
sink pushes everything as soon as it arrives, and also writes
everything to the persistent buffer, and then in case of a replay it
looks into the buffer before pushing every element, and only does the
push if the buffer says that the element was not pushed before.

Best,
Gábor


2016-02-05 11:57 GMT+01:00 Stephan Ewen :
> Hi Niels!
>
> In general, exactly once output requires transactional cooperation from the
> target system. Kafka has that on the roadmap, we should be able to integrate
> that once it is out.
> That means output is "committed" upon completed checkpoints, which
> guarantees nothing is written multiple times.
>
> Chesnay is working on an interesting prototype as a generic solution (also
> for Kafka, while they don't have that feature):
> It buffers the data in the sink persistently (using the fault tolerance
> state backends) and pushes the results out on notification of a completed
> checkpoint.
> That gives you exactly once semantics, but involves an extra materialization
> of the data.
>
>
> I think that there is actually a fundamental latency issue with "exactly
> once sinks", no matter how you implement them in any systems:
> You can only commit once you are sure that everything went well, to a
> specific point where you are sure no replay will ever be needed.
>
> So the latency in Flink for an exactly-once output would be at least the
> checkpoint interval.
>
> I'm eager to hear your thoughts on this.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes  wrote:
>>
>> Hi,
>>
>> It is my understanding that the exactly-once semantics regarding the input
>> from Kafka is based on the checkpointing in the source component retaining
>> the offset where it was at the checkpoint moment.
>>
>> My question is how does that work for a sink? How can I make sure that (in
>> light of failures) each message that is read from Kafka (my input) is
>> written to Kafka (my output) exactly once?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>
>


Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
The way I imagine this is that the sink would have its "own
checkpoints" separately from the rest of the system, and with much
smaller interval, and writes to Kafka (with "transactional
cooperation", as Stephan mentioned) during making these checkpoints.
And then when a replay happens from a global system checkpoint, it can
look at its own checkpoints to decide for each tuple whether to send
it or not.

@Stephan:

> That assumes deterministic streams and to some extend deterministic tuple 
> order.
> That may be given sometimes, but it is a very strong assumption in many cases.

Ah yes, you are right. But doing everything based on event time points
in this direction of deterministic streams, right?


Re: Left join with unbalanced dataset

2016-02-02 Thread Gábor Gévay
Hello Arnaud,

> Flink does not start the reduce operation until all lines have
> been created (memory bottleneck is during the collection
> of all lines) ; but theorically it is possible.

The problem that `S.groupBy(...).reduce(...)` needs to fully
materialize S comes from the fact that the implementation of reduce is
currently sort based. But this PR will partially solve this:
https://github.com/apache/flink/pull/1517
It implements a hash-based combiner, which will not materialize the
input, but instead needs memory proportional to only the number of
different keys occurring. You might want to try rebasing to this PR,
to see whether it improves your situation.

(I also plan to extend this implementation to the actual reduce after
the combine, but I'm not sure when will I get around to that.)

Best,
Gábor



2016-02-02 16:56 GMT+01:00 LINZ, Arnaud :
> Thanks,
>
> Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads 
> to the success of the batch.
>
> I've figured out which dataset is consuming the most memory, I have a big 
> join that demultiplies the size of the input set before a group reduce.
> I am willing to optimize my code by reducing the join output size upon 
> junction.
>
> The outline of the treatment is :
> DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
> DataSet B = (K1, V2)  where there are multiple values V2 for the same K1 (say 
> 5)
>
> I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
> As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A.
>
> Flink does not start the reduce operation until all lines have been created 
> (memory bottleneck is during the collection of all lines) ; but theorically 
> it is possible.
> I see no "join group" operator that could do something like 
> "A.groupBy(K1,K2).join(B).on(K1).reduce()"
>
> Is there a way to do this ?
>
> The other way I see is to load B in memory for all nodes and use a hash map 
> upon reduction to get all A.join(B) lines. B is not that small, but I think 
> it will still save RAM.
>
> Best regards,
> Arnaud
>
> -Message d'origine-
> De : Ufuk Celebi [mailto:u...@apache.org]
> Envoyé : mardi 2 février 2016 15:27
> À : user@flink.apache.org
> Objet : Re: Left join with unbalanced dataset
>
>
>> On 02 Feb 2016, at 15:15, LINZ, Arnaud  wrote:
>>
>> Hi,
>>
>> Running again with more RAM made the treatement go further, but Yarn still 
>> killed one container for memory consumption. I will experiment various 
>> memory parameters.
>
> OK, the killing of the container probably triggered the 
> RemoteTransportException.
>
> Can you tell me how many containers you are using, how much phyiscal memory 
> the machines have and how much the containers get?
>
> You can monitor memory usage by setting
>
> taskmanager.debug.memory.startLogThread: true
>
> in the config. This will periodically log the memory consumption to the task 
> manager logs. Can you try this and check the logs for the memory consumption?
>
> You can also have a look at it in the web frontend under the Task Manager tab.
>
> – Ufuk
>
>
> 
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
> l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The 
> company that sent this message cannot therefore be held liable for its 
> content nor attachments. Any unauthorized use or dissemination is prohibited. 
> If you are not the intended recipient of this message, then please delete it 
> and notify the sender.


Re: Local collection data sink for the streaming API

2016-01-05 Thread Gábor Gévay
Try the getJavaStream method of the scala DataStream.

Best,
Gábor




2016-01-05 19:14 GMT+01:00 Filipe Correia <filipe.corr...@nmusic.pt>:
> Hi Gábor, Thanks!
>
> I'm using Scala though. DataStreamUtils.collect() depends on
> org.apache.flink.streaming.api.datastream.DataStream, rather than
> org.apache.flink.streaming.api.scala.DataStream. Any suggestion on how
> to handle this, other than creating my own scala implementation of
> DataStreamUtils.collect()?
>
> Thanks,
>
> Filipe
>
> On Tue, Jan 5, 2016 at 3:33 PM, Gábor Gévay <gga...@gmail.com> wrote:
>> Hi Filipe,
>>
>> You can take a look at `DataStreamUtils.collect` in
>> flink-contrib/flink-streaming-contrib.
>>
>> Best,
>> Gábor
>>
>>
>>
>> 2016-01-05 16:14 GMT+01:00 Filipe Correia <filipe.corr...@nmusic.pt>:
>>> Hi,
>>>
>>> Collecting results locally (e.g., for unit testing) is possible in the
>>> DataSet API by using "LocalCollectionOutputFormat", as described in
>>> the programming guide:
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#collection-data-sources-and-sinks
>>>
>>> Can something similar be done for the DataStream API?
>>>
>>> Thanks,
>>>
>>> Filipe


Re: Reading multiple datasets with one read operation

2015-10-22 Thread Gábor Gévay
Hello!

> I have thought about a workaround where the InputFormat would return
> Tuple2s and the first field is the name of the dataset to which a record
> belongs. This would however require me to filter the read data once for
> each dataset or to do a groupReduce which is some overhead i'm
> looking to prevent.

I think that those two filters might not have that much overhead,
because of several optimizations Flink does under the hood:
- The dataset of Tuple2s won't be materialized, but instead will be
streamed directly to the two filter operators.
- The input format and the two filters will probably end up on the
same machine, because of chaining, so there won't be
serialization/deserialization between them.

Best,
Gabor



2015-10-22 11:38 GMT+02:00 Pieter Hameete :
> Good morning!
>
> I have the following usecase:
>
> My program reads nested data (in this specific case XML) based on
> projections (path expressions) of this data. Often multiple paths are
> projected onto the same input. I would like each path to result in its own
> dataset.
>
> Is it possible to generate more than 1 dataset using a readFile operation to
> prevent reading the input twice?
>
> I have thought about a workaround where the InputFormat would return Tuple2s
> and the first field is the name of the dataset to which a record belongs.
> This would however require me to filter the read data once for each dataset
> or to do a groupReduce which is some overhead i'm looking to prevent.
>
> Is there a better (less overhead) workaround for doing this? Or is there
> some mechanism in Flink that would allow me to do this?
>
> Cheers!
>
> - Pieter


Re: For each element in a dataset, do something with another dataset

2015-09-30 Thread Gábor Gévay
Hello,

Alternatively, if dataset B fits in memory, but dataset A doesn't,
then you can do it with broadcasting B to a RichMapPartitionFunction
on A:
In the open method of mapPartition, you sort B. Then, for each element
of A, you do a binary search in B, and look at the index found by the
binary search, which will be the count that you are looking for.

Best,
Gabor



2015-09-30 11:20 GMT+02:00 Fabian Hueske :
> The idea is to partition both datasets by range.
> Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> [1,2,3] and p2: [4,5,6].
> Each partition is given to a different instance of a MapPartition operator
> (this is a bit tricky, because you cannot use broadcastSet. You could load
> the corresponding partition it in the open() function from HDFS for
> example).
>
> DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> partition 1, everything > 3 goes to p2. You can partition a dataset by range
> using the partitionCustom() function. The partitioned dataset is given to
> the mapPartition operator that loaded a partition of dataset A in each task
> instance.
> You do the counting just like before (sorting the partition of dataset A,
> binary sort, long[]), but add an additional count for the complete partition
> (basically count all elements that arrive in the task instance).
>
> If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would
> be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> Now you need to compute the final count by adding the "all" counts of the
> lower partitions to the counts of the "higher" partitions, i.e., add all:5
> of p1 to all counts for p2.
>
> This approach requires to know the value range and distribution of the
> values which makes it a bit difficult. I guess you'll get the best
> performance, if you partition in a way, that you have about equally sized
> partitions of dataset B with the constraint that the corresponding
> partitions of A fit into memory.
>
> As I said, its a bit cumbersome. I hope you could follow my explanation.
> Please ask if something is not clear ;-)
>
> 2015-09-30 10:46 GMT+02:00 Pieter Hameete :
>>
>> Hi Fabian,
>>
>> thanks for your tips!
>>
>> Do you have some pointers for getting started with the 'tricky range
>> partitioning'? I am quite keen to get this working with large datasets ;-)
>>
>> Cheers,
>>
>> Pieter
>>
>> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
>>>
>>> Hi Pieter,
>>>
>>> cross is indeed too expensive for this task.
>>>
>>> If dataset A fits into memory, you can do the following: Use a
>>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> broadcastSet. In the open method of mapPartition, you can load the
>>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>>> counts. For each element of dataset B, you do a binary search on the sorted
>>> dataset A and increase all counts up to the position in the sorted list.
>>> After all elements of dataset B have been processed, return the counts from
>>> the long[].
>>>
>>> If dataset A doesn't fit into memory, things become more cumbersome and
>>> we need to play some tricky with range partitioning...
>>>
>>> Let me know, if you have questions,
>>> Fabian
>>>
>>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete :

 Good day everyone,

 I am looking for a good way to do the following:

 I have dataset A and dataset B, and for each element in dataset A I
 would like to filter dataset B and obtain the size of the result. To say it
 short:

 for each element a in A -> B.filter( _ < a.propertyx).count

 Currently I am doing a cross of dataset A and B, making tuples so I can
 then filter all the tuples where field2 < field1.propertya and then group 
 by
 field1.id and get the sizes of the groups.However this is not working out 
 in
 practice. When the datasets get larger, some Tasks hang on the CHAIN Cross
 -> Filter probably because there is insufficient memory for the cross to be
 completed?

 Does anyone have a suggestion on how I could make this work, especially
 with datasets that are larger than memory available to a separate Task?

 Thank you in advance for your time :-)

 Kind regards,

 Pieter Hameete
>>>
>>>
>>
>


Re: Gelly vertex ID type requirements?

2015-07-31 Thread Gábor Gévay
Hi,

 I opened a JIRA (FLINK-2442) and submitted a PR (#963) for the Wrong field
 type problem.

Thanks for the fix!

 Is the other problem is addressed in FLINK-2437?

Unfortunately no. FLINK-2437 is a minor thing compared to the other
problem. I opened a Jira for it (FLINK-2447).

Best,
Gabor





2015-07-31 0:29 GMT+02:00 Fabian Hueske fhue...@gmail.com:
 Hi,

 I opened a JIRA (FLINK-2442) and submitted a PR (#963) for the Wrong field
 type problem.
 Is the other problem is addressed in FLINK-2437?

 Cheers, Fabian

 2015-07-30 16:29 GMT+02:00 Gábor Gévay gga...@gmail.com:

 Thanks for the response.
 As a temporary workaround, I tried to change these problematic lines:

 } else {
Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong
 field type:  + fieldType.toString());
keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
 }

 into this:

 } else if (fieldType instanceof AtomicType) {
keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
 } else {
Preconditions.checkArgument(fieldType instanceof PojoTypeInfo,
 Wrong field type:  + fieldType.toString());
((PojoTypeInfo)fieldType).getFlatFields(*, 0, keyFields);
 }


 But then I ran into another problem: The TypeExtractor creates the
 TupleTypeInfoBase for the Edge type of my graph with the following
 types:

 0 = {PojoTypeInfo@1067} PojoTypemalom.GameState, fields = [board:
 Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w:
 Byte, wf: Byte]]
 1 = {GenericTypeInfo@1068} GenericTypemalom.GameState
 2 = {ValueTypeInfo@1069} ValueTypeNullValue

 The problem here is that the first two types should clearly be the
 same, as the Edge type looks like this:
 public class EdgeK, V extends Tuple3K, K, V

 I did a bit of debugging on this, and the source of the problem seems
 to be the mechanism in TypeExtractor that would detect recursive types
 (see the alreadySeen field in TypeExtractor), as it mistakes the
 second appearance of malom.GameState with a recursive field.

 Specifically the following happens: createTypeInfoWithTypeHierarchy
 starts to process the Edge type, and in line 433 it calls itself for
 the first field, which proceeds into the privateGetForClass case which
 correctly detects that it is a POJO, and correctly returns a
 PojoTypeInfo; but in the meantime in line 1190, privateGetForClass
 adds malom.GameState to alreadySeen. Then the outer
 createTypeInfoWithTypeHierarchy approaches the second field, goes into
 privateGetForClass, which mistakenly returns a GenericTypeInfo, as it
 thinks in line 1186, that a recursive type is being processed.

 Should I open a Jira for this?

 A possible solution would be to change the alreadySeen field into a
 parameter of all the various type extraction methods, and make it
 contain only those types that are ancestors in the nesting hierarchy.

 Best regards,
 Gabor

 2015-07-30 14:32 GMT+02:00 Fabian Hueske fhue...@gmail.com:
  Thanks for reporting this issue.
  The Wrong field type error looks like a bug to me.
  This happens, because PojoType is neither a TupleType nor an AtomicType.
  To
  me it looks like the TupleTypeInfoBase condition should be generalized
  to
  CompositeType.
 
  I will look into this.
 
  Cheers, Fabian
 
  2015-07-30 14:18 GMT+02:00 Gábor Gévay gga...@gmail.com:
 
  Hello,
 
  I am having some trouble building a graph where the vertex ID type is
  a POJO. Specifically, it would be a class that has two fields: a long
  field, and a field which is of another class that has four byte
  fields. (Both classes implement the Comparable interface, as the Gelly
  guide specifies.) If the outer class has a default constructor, then
  it is recognized as a POJO, but I get the following exception:
 
  Exception in thread main java.lang.IllegalArgumentException: Wrong
  field type: PojoTypemalom.GameState, fields = [board: Long, sid:
  PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf:
  Byte]]
  at
 
  com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
  at
 
  org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241)
  at
 
  org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203)
  at
 
  org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458)
  at org.apache.flink.graph.Graph.inDegrees(Graph.java:701)
  at
 
  org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610)
  at
 
  org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180)
  at
  org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044)
  at
  org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312)
  at malom.Retrograde.run(Retrograde.java:64)
  at malom.Solver.main(Solver.java:32)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method

Gelly vertex ID type requirements?

2015-07-30 Thread Gábor Gévay
Hello,

I am having some trouble building a graph where the vertex ID type is
a POJO. Specifically, it would be a class that has two fields: a long
field, and a field which is of another class that has four byte
fields. (Both classes implement the Comparable interface, as the Gelly
guide specifies.) If the outer class has a default constructor, then
it is recognized as a POJO, but I get the following exception:

Exception in thread main java.lang.IllegalArgumentException: Wrong
field type: PojoTypemalom.GameState, fields = [board: Long, sid:
PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf:
Byte]]
at 
com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at 
org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241)
at 
org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203)
at 
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458)
at org.apache.flink.graph.Graph.inDegrees(Graph.java:701)
at 
org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610)
at 
org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180)
at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044)
at org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312)
at malom.Retrograde.run(Retrograde.java:64)
at malom.Solver.main(Solver.java:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Note, that originally the exception just said Wrong field type, from
which I had no idea what type is it referring to, so I modified line
241 of Keys.java to include the type in the msg like this:
Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong
field type:  + fieldType.toString());

On the other hand, if I comment out the default constructor of the
outer class, then it is not a POJO, but only a GenericTypeInfo is
created from it, which implements AtomicType, so the previous
exception does not appear. Does this mean that the Vertex IDs cannot
be POJOs? I am not sure if this is the intended behaviour. After all,
if we have a POJO, that should always be better then if we have a
generic type, right?

(I am just guessing here, but maybe CompositeType could also be
regarded as an AtomicType: the only method declared by the AtomicType
interface is createComparator, which is also defined in CompositeType
(of which PojoTypeInfo is a subclass), but with different parameters,
but maybe CompositeType could implement AtomicType by delegating the
createComparator call with all of the fields specified?)

I encountered another problem, which is may or may not be related to
the above: without the default constructor (the GenericTypeInfo case),
the VertexCentricIteration eats my graph, that is, the result graph
has zero vertices. I traced this problem to the first join in
VertexCentricIteration.createResultVerticesWithDegrees, where the
degrees DataSet is created: both inputs of the join (outDegrees and
inDegrees) contains the correct data, but the result (degrees) is
empty. Interestingly, this problem disappears, if I add
JoinHint.REPARTITION_SORT_MERGE.

Best regards,
Gabor


Re: Gelly vertex ID type requirements?

2015-07-30 Thread Gábor Gévay
Thanks for the response.
As a temporary workaround, I tried to change these problematic lines:

} else {
   Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong
field type:  + fieldType.toString());
   keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
}

into this:

} else if (fieldType instanceof AtomicType) {
   keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
} else {
   Preconditions.checkArgument(fieldType instanceof PojoTypeInfo,
Wrong field type:  + fieldType.toString());
   ((PojoTypeInfo)fieldType).getFlatFields(*, 0, keyFields);
}


But then I ran into another problem: The TypeExtractor creates the
TupleTypeInfoBase for the Edge type of my graph with the following
types:

0 = {PojoTypeInfo@1067} PojoTypemalom.GameState, fields = [board:
Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w:
Byte, wf: Byte]]
1 = {GenericTypeInfo@1068} GenericTypemalom.GameState
2 = {ValueTypeInfo@1069} ValueTypeNullValue

The problem here is that the first two types should clearly be the
same, as the Edge type looks like this:
public class EdgeK, V extends Tuple3K, K, V

I did a bit of debugging on this, and the source of the problem seems
to be the mechanism in TypeExtractor that would detect recursive types
(see the alreadySeen field in TypeExtractor), as it mistakes the
second appearance of malom.GameState with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy
starts to process the Edge type, and in line 433 it calls itself for
the first field, which proceeds into the privateGetForClass case which
correctly detects that it is a POJO, and correctly returns a
PojoTypeInfo; but in the meantime in line 1190, privateGetForClass
adds malom.GameState to alreadySeen. Then the outer
createTypeInfoWithTypeHierarchy approaches the second field, goes into
privateGetForClass, which mistakenly returns a GenericTypeInfo, as it
thinks in line 1186, that a recursive type is being processed.

Should I open a Jira for this?

A possible solution would be to change the alreadySeen field into a
parameter of all the various type extraction methods, and make it
contain only those types that are ancestors in the nesting hierarchy.

Best regards,
Gabor

2015-07-30 14:32 GMT+02:00 Fabian Hueske fhue...@gmail.com:
 Thanks for reporting this issue.
 The Wrong field type error looks like a bug to me.
 This happens, because PojoType is neither a TupleType nor an AtomicType. To
 me it looks like the TupleTypeInfoBase condition should be generalized to
 CompositeType.

 I will look into this.

 Cheers, Fabian

 2015-07-30 14:18 GMT+02:00 Gábor Gévay gga...@gmail.com:

 Hello,

 I am having some trouble building a graph where the vertex ID type is
 a POJO. Specifically, it would be a class that has two fields: a long
 field, and a field which is of another class that has four byte
 fields. (Both classes implement the Comparable interface, as the Gelly
 guide specifies.) If the outer class has a default constructor, then
 it is recognized as a POJO, but I get the following exception:

 Exception in thread main java.lang.IllegalArgumentException: Wrong
 field type: PojoTypemalom.GameState, fields = [board: Long, sid:
 PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf:
 Byte]]
 at
 com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
 at
 org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241)
 at
 org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203)
 at
 org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458)
 at org.apache.flink.graph.Graph.inDegrees(Graph.java:701)
 at
 org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610)
 at
 org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180)
 at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044)
 at
 org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312)
 at malom.Retrograde.run(Retrograde.java:64)
 at malom.Solver.main(Solver.java:32)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at
 com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

 Note, that originally the exception just said Wrong field type, from
 which I had no idea what type is it referring to, so I modified line
 241 of Keys.java to include the type in the msg like this:
 Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong
 field type:  + fieldType.toString());

 On the other hand, if I comment out the default constructor of the
 outer class