Re: Deep Copy in FLINK, Kryo Copy is used in the different operator
Hello, You might also be able to make Flink use a better serializer than Kryo. Flink falls back to Kryo when it can't use its own serializers, see here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/types_serialization.html For example, it might help to make your type a POJO. Best, Gábor On Wed, Feb 14, 2018 at 3:38 PM, Aljoscha Krettekwrote: > Hi, > > You can disable those copies via ExecutionConfig.enableObjectReuse(), which > you can get from the StreamExecutionEnvironment via getConfig(). > > Best, > Aljoscha > >> On 12. Feb 2018, at 04:00, chen wrote: >> >> Actually our team have our own Stream Engine, we tested our engine and flink, >> find out when we aggregate the stream data, the throughput is decreasing >> very fast. >> >> So we catch the stack and find out a deep copy in flink. >> >> In different operator, there will be >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy between >> in different operator. >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: At end of complex parallel flow, how to force end step with parallel=1?
Hi Garrett, You can call .setParallelism(1) on just this operator: ds.reduceGroup(new GroupReduceFunction...).setParallelism(1) Best, Gabor On Mon, Oct 2, 2017 at 3:46 PM, Garrett Bartonwrote: > I have a complex alg implemented using the DataSet api and by default it > runs with parallel 90 for good performance. At the end I want to perform a > clustering of the resulting data and to do that correctly I need to pass all > the data through a single thread/process. > > I read in the docs that as long as I did a global reduce using > DataSet.reduceGroup(new GroupReduceFunction) that it would force it to a > single thread. Yet when I run the flow and bring it up in the ui, I see > parallel 90 all the way through the dag including this one. > > Is there a config or feature to force the flow back to a single thread? Or > should I just split this into two completely separate jobs? I'd rather not > split as I would like to use flinks ability to iterate on this alg and > cluster combo. > > Thank you
Re: Rule expression for CEP library
Hello Shailesh, There is a Flink Improvement Proposal for Integration of SQL and CEP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-20:+Integration+of+SQL+and+CEP Best, Gábor On Mon, Sep 25, 2017 at 3:21 PM, Shailesh Jainwrote: > Hi, > > Apart from the Java/Scala API for the CEP library, is there any other way to > express patterns/rules which can be run on flink engine? > > Are there any plans on adding a DSL/Rule expression language for CEP anytime > soon? If not, any pointers on how it can be achieved now would be really > helpful. > > Thanks, > Shailesh
Re: Dot notation not working for accessing case classes nested fields
Hi Federico, Sorry, nested field expressions are not supported in these methods at the moment. I have created a JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-7629 I think this should be easy to fix, as all the infrastructure for supporting this is already in place. I'll try to do it over the weekend. Best, Gábor On Thu, Sep 14, 2017 at 3:51 PM, Federico D'Ambrosiowrote: > Hi, > > I have the following case classes: > > case class Event(instantValues: InstantValues) > case class InstantValues(speed: Int, altitude: Int, time: DateTime) > > > in a DataStream[Event] I'd like to perform a maxBy operation on the field > time of instantValue for each event and according to the docs here it would > be possible to use the dot notation such the following: > > val events = stream > .keyBy("otherField") > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .maxBy("instantValues.time") > > positionToMaxBy - In case of a POJO, Scala case class, or Tuple type, the > name of the public) field on which to perform the aggregation. Additionally, > a dot can be used to drill down into nested objects, as in "field1.fieldxy" > . Furthermore "*" can be specified in case of a basic type (which is > considered as having only one field). > > > Still, I'm getting the following error: > > Fields 'instantValues.time' are not valid for 'package.Event(instantValues: > package.InstantValues(speed: Integer, altitude: Integer, time: > GenericType))' > > whereas if, for instance, use only "instantValues" (while implementing its > compareTo method) the aggregation works as usual. > > Any idea as to why this isn't working? Am I doing something wrong? > > Thanks a lot, > Federico
Re: Bulk Iteration
Hello Alieh, If you set the logging to a more verbose level, then Flink prints a log msg at every iteration. If you need the current iteration number inside your code, then you should create your UDF as an AbstractRichFunction, where you can call getIterationRuntimeContext(), which has getSuperstepNumber(). Best, Gabor On Mon, Sep 11, 2017 at 2:57 PM, Aliehwrote: > Hello all, > > using Bulk iteration, is there any way to know the number of iterations? > > > Cheers, > Alieh >
Re: DataSet: CombineHint heuristics
Hi Urs, Yes, the 1/10th ratio is just a very loose rule of thumb. I would suggest to try both the SORT and HASH strategies with a workload that is as similar as possible to your production workload (similar data, similar parallelism, etc.), and see which one is faster for your specific use case. An important difference between the HASH and SORT strategies is that the sorting combiner stores the original input records, while the hash combiner stores only combined records. I.e., when an input record arrives whose key is already in the hashtable then this record won't consume additional memory, because it is combined right away. Therefore, for example, if you would like your combiner to not emit any records prematurely (i.e., combine everything possible, without running out of memory), then with the SORT strategy you need combiner memory proportional to your input size, while with the HASH strategy you need combiner memory proportional only to the number of keys. You are correct in that the performance depends very much on how many records fit into a single Sorter/Hashtable. However, I wrote #keys/#total records into the documentation because this is easier for a user to estimate, and this ratio being small correlates with the HASH strategy getting faster, as explained above. Best, Gábor On Thu, Aug 31, 2017 at 4:02 PM, Aljoscha Krettekwrote: > Hi, > > I would say that your assumption is correct and that the COMBINE strategy > does in fact also depend on the ration " #total records/#records that fit > into a single Sorter/Hashtable". > > I'm CC'ing Fabian, just to be sure. He knows that stuff better than I do. > > Best, > Aljoscha > >> On 31. Aug 2017, at 13:41, Urs Schoenenberger >> wrote: >> >> Hi all, >> >> I was wondering about the heuristics for CombineHint: >> >> Flink uses SORT by default, but the doc for HASH says that we should >> expect it to be faster if the number of keys is less than 1/10th of the >> number of records. >> >> HASH should be faster if it is able to combine a lot of records, which >> happens if multiple events for the same key are present in a data chunk >> *that fits into a combine-hashtable* (cf handling in >> ReduceCombineDriver.java). >> >> Now, if I have 10 billion events and 100 million keys, but only about 1 >> million records fit into a hashtable, the number of matches may be >> extremely low, so very few events are getting combined (of course, this >> is similar for SORT as the sorter's memory is bounded, too). >> >> Am I correct in assuming that the actual tradeoff is not only based on >> the ratio of #total records/#keys, but also on #total records/#records >> that fit into a single Sorter/Hashtable? >> >> Thanks, >> Urs >> >> -- >> Urs Schönenberger - urs.schoenenber...@tngtech.com >> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >
Re: termination of stream#iterate on finite streams
Hello, There is a Flink Improvement Proposal to redesign the iterations: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 This will address the termination issue. Best, Gábor On Mon, Sep 4, 2017 at 11:00 AM, Xingcan Cuiwrote: > Hi Peter, > > That's a good idea, but may not be applicable with an iteration operator. > The operator can > not determine when to generate the "end-of-stream message" for the feedback > stream. > The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has > no side-effects. > > Best, > Xingcan > > > > On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl wrote: >> >> Hi Xingcan! >> >> if a _finite_ stream would, at the end, emit a special, trailing >> "End-Of-Stream Message" that floats downward the operator stream, wouldn't >> this enable us to deterministically end the iteration without needing a >> timeout? >> >> Having an arbitrary timeout that must be longer than any iteration step >> takes seems really awkward. >> >> What you think? >> >> Best regards >> Peter >> >> >> Am 02.09.2017 um 17:16 schrieb Xingcan Cui : >> >> Hi Peter, >> >> I just omitted the filter part. Sorry for that. >> >> Actually, as the javadoc explained, by default a DataStream with iteration >> will never terminate. That's because in a >> stream environment with iteration, the operator will never know whether >> the feedback stream has reached its end >> (though the data source is terminated, there may be unknowable subsequent >> data) and that's why it needs a >> timeout value to make the judgement, just like many other function calls >> in network connection. In other words, >> you know the feedback stream will be empty in the future, but the operator >> doesn't. Thus we provide it a maximum >> waiting time for the next record. >> >> Internally, this mechanism is implemented via a blocking queue (the >> related code can be found here). >> >> Hope everything is considered this time : ) >> >> Best, >> Xingcan >> >> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl wrote: >>> >>> >>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui : >>> >>> In your codes, all the the long values will subtract 1 and be sent back >>> to the iterate operator, endlessly. >>> >>> >>> >>> Is this true? shouldn't >>> >>> val iterationResult2 = env.generateSequence(1, 4).iterate(it => { >>> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump >>> meaningless 'y' chars just to do anything >>> }) >>> iterationResult2.print() >>> >>> >>> produce the following _feedback_ streams? >>> >>> initial input to #iterate(): [1 2 3 4] >>> >>> iteration #1 : [1 2 3] >>> iteration #2 : [1 2] >>> iteration #3 : [1] >>> iteration #4 : [] => empty feedback stream => cause termination? (which >>> actually only happens when setting a timeout value) >>> >>> Best regards >>> Peter >> >> >> >> Am 02.09.2017 um 17:16 schrieb Xingcan Cui : >> >> Hi Peter, >> >> I just omitted the filter part. Sorry for that. >> >> Actually, as the javadoc explained, by default a DataStream with iteration >> will never terminate. That's because in a >> stream environment with iteration, the operator will never know whether >> the feedback stream has reached its end >> (though the data source is terminated, there may be unknowable subsequent >> data) and that's why it needs a >> timeout value to make the judgement, just like many other function calls >> in network connection. In other words, >> you know the feedback stream will be empty in the future, but the operator >> doesn't. Thus we provide it a maximum >> waiting time for the next record. >> >> Internally, this mechanism is implemented via a blocking queue (the >> related code can be found here). >> >> Hope everything is considered this time : ) >> >> Best, >> Xingcan >> >> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl wrote: >>> >>> >>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui : >>> >>> In your codes, all the the long values will subtract 1 and be sent back >>> to the iterate operator, endlessly. >>> >>> >>> >>> Is this true? shouldn't >>> >>> val iterationResult2 = env.generateSequence(1, 4).iterate(it => { >>> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump >>> meaningless 'y' chars just to do anything >>> }) >>> iterationResult2.print() >>> >>> >>> produce the following _feedback_ streams? >>> >>> initial input to #iterate(): [1 2 3 4] >>> >>> iteration #1 : [1 2 3] >>> iteration #2 : [1 2] >>> iteration #3 : [1] >>> iteration #4 : [] => empty feedback stream => cause termination? (which >>> actually only happens when setting a timeout value) >>> >>> Best regards >>> Peter >>> >>> >> >> >
Re: Flink Vs Google Cloud Dataflow?
Hello, Have you seen these two blog posts? They explain the relationship between Apache Flink, Apache Beam, and Google Cloud Dataflow. https://data-artisans.com/blog/why-apache-beam https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective Best, Gábor On Mon, Jul 31, 2017 at 6:36 AM, Sridhar Chellappawrote: > Did anyone study Cloud DataFlow as an alternative to Flink? If yes, can > someone summarize their analysis of Dataflow as against Flink?
Re: Beginner question - sum multiple edges
Hi Marc, You can do such a map on your edges dataset that switches the direction of edges if the source id is bigger than the target id. So your MapFunction will have an if, which checks whether the source id is bigger than the target id, and if it is bigger, than returns the edge reversed, else returns the edge unchanged. So the result of this on your example would be: 1 2 10 1 2 10 1 2 10 // This is the switched edge 1 1 1 1 3 5 You can remove loops with a filter, after which you get: 1 2 10 1 2 10 1 2 10 1 3 5 And after this, my original solution (.groupBy(0, 1).sum(2)) should work, and give you: 1 2 30 1 3 5 Note, that my above solution is not using Gelly, so you don't need parameters.setDirection(EdgeDirection.ALL). I'm not sure whether there is a more simple solution that uses Gelly. Best, Gábor On Sun, Apr 23, 2017 at 4:56 PM, Kaepke, Marc <marc.kae...@haw-hamburg.de> wrote: > Hi Gábor and anyone else ;-) , > > I need your help again. > > My goal is a graph without self-loops and sum all edge values between two > vertices into one single edge (both direction). > e.g. my input graph is described by: > 1 2 10 > 1 2 10 > 2 1 10 > 1 1 1 > 1 3 5 > > The result has to be: > 1 2 30 (sum and reduce all edges between vertex 1 and 2. I don’t care about > the direction of these vertices because of the > setDirection(EdgeDirection.ALL) parameter) > 1 3 5 > > > My intermediate result is: > 1 2 20 > 2 1 10 > 1 3 5 > with the following transformations: > > ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); > parameters.setDirection(EdgeDirection.ALL); > > // reduce multi edges > Graph networkSumMultiEdges = > Graph.fromTupleDataSet(inputGraph.getEdges().groupBy(0, 1).sum(2), > ExecutionEnvironment.getExecutionEnvironment()); > // reduce self-looping > Graph networkGraph = new Simplify<>().run(networkSumMultiEdges); > > > How can I reduce and combine (1 2 20) and (2 1 10) to one Tuple? > > > Best regards > Marc > > > Am 17.04.2017 um 21:47 schrieb Kaepke, Marc <marc.kae...@haw-hamburg.de>: > > Hi Gábor, > > thanks a lot > > Best, > Marc > > Am 17.04.2017 um 20:32 schrieb Gábor Gévay <gga...@gmail.com>: > > Hello Marc, > > You can group by edge, and then sum: > > edges > .groupBy(0,1) // make first two fields a composite key > .sum(2); // sum the value field > > This will turn multiple edges that have the same source and target > into one edge, whose value will be the sum of the values of the > original group of edges. > > Best, > Gabor > > PS.: Sorry for the duplicate email, I accidentally sent my previous > email only to you instead of the mailing list. > > > > > On Mon, Apr 17, 2017 at 5:46 PM, Kaepke, Marc > <marc.kae...@haw-hamburg.de> wrote: > > Hi, > > how can I sum and reduce multiple edges in my entire graph? > > e.g. my input graph looks like (source-ID, target-ID, value): > (1, 2, 30) > (1, 2, 10) > (2, 1, 55) > > And I need: > (1, 2, 40) > (2, 1, 55) > > > Thanks! > Marc > > >
Re: Join with Default-Value
I'm not sure what exactly is the problem, but could you check this FAQ item? http://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception- Best, Gábor 2017-02-10 14:16 GMT+01:00 Sebastian Neef: > Hi, > > thanks! That's exactly what I needed. > > I'm not using: DataSetA.leftOuterJoin(DataSetB).where(new > KeySelector()).equalTo(new KeySelector()).with(new JoinFunction(...)). > > Now I get the following error: > >> Caused by: org.apache.flink.optimizer.CompilerException: Error translating >> node 'Map "Key Extractor" : MAP [[ GlobalProperties >> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, >> grouped=null, unique=null] ]]': Could not write the user code wrapper class >> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >> java.io.NotSerializableException: >> org.apache.flink.api.java.operators.JoinOperator$EquiJoin > > I'm using flink-1.1.4-hd27. > > Any ideas how I can fix that bug? It did properly work with a simple .join() > > Regards, > Sebastian
Re: Join with Default-Value
Hello Sebastian, You can use DataSet.leftOuterJoin for this. Best, Gábor 2017-02-10 12:58 GMT+01:00 Sebastian Neef: > Hi, > > is it possible to assign a "default" value to elements that didn't match? > > For example I have the following two datasets: > > |DataSetA | DataSetB| > - > |id=1 | id=1 > |id=2 | id=3 > |id=5 | id=4 > |id=6 | id=6 > > When doing a join with: > > A.join(B).where( KeySelector(A.id)) > .equalTo(KeySelector(B.id)) > > The resulting dataset is: > > |(DataSetA | DataSetB)| > - > |(id=1| id=1) > |(id=6| id=6) > > What is the best way to assign a default value to the elements id=2/id=5 > from DataSet A. E.g. I need a result which looks similar to this: > > |(DataSetA | DataSetB)| > - > |(id=1| id=1) > |(id=2| Default) > |(id=5| Default) > |(id=6| id=6) > > My idea would be to get the missing Elements from DataSetA by .filter > with (DataSetA|DataSetB) and then do a .union after creating a tuple > with a default value. But that sounds a bit over-complicated. > > Best regards, > Sebastian
Re: Cyclic ConnectedStream
I somehow still suspect that iterations might work for your use case. Note, that in the streaming API, iterations are currently nothing more than a back-edge in the topology, i.e. a low-level tool to create a cyclic topology, like as you say with your hypothetical setter syntax. (It's quite different from the iterations of the batch API.) The tricky part for your use-case is that you would want a ConnectedStream as your iteration head, which should get the elements from the back-edge in a separated way from the normal input. You could simulate this by using not ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input element type is an Either type, whose two components would be the normal input and the back-edge input. (And you add maps before the closeWith and to your input1, which would appropriately wrap into the two alternatives of the Either type.) Best, Gábor 2017-01-29 15:39 GMT+01:00 Matt <dromitl...@gmail.com>: > Check this image for clarification, this is what I'm trying to do: > http://i.imgur.com/iZxPv04.png > > [image: Inline image 1] > > The rectangles are the two CoFlatMapFunction, sharing a state between > process and update (map1 and map2). It's clear from the image that I need > input1 and the green box to create the blue box, and input2 and the blue > box to create the green one. > > --- > *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); > *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); > --- > > As you can see there's no cycle in the flow of data so I guess this > topology is valid. The problem is not having a way to define such flow. > > For instance, with the appropriate setters we would be able to do this: > > --- > *blue* = *input1*.connect(); > *green* = *input2*.connect(); > > *blue.*setConnection(*green*); > *green*.setConnection(*blue*); > > *blue*.keyBy(...).flatMap(...); > *green*.keyBy(...).flatMap(...); > --- > > Any idea is welcome. > > Matt > > On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote: > >> I'm aware of IterativeStream but I don't think it's useful in this case. >> >> As shown in the example above, my use case is "cyclic" in that the same >> object goes from *Input* to *predictionStream* (flatMap1), then to >> *statsStream* (flatMap2, where it's updated with an object from *Input2*) >> and finally to *predictionStream* (flatMap2). >> >> The same operator is never applied twice to the object, thus I would say >> this dataflow is cyclic only in the dependencies of the stream >> (predictionStream depends on statsStream, but it depends on >> predictionStream in the first place). >> >> I hope it is clear now. >> >> Matt >> >> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote: >> >>> Hello, >>> >>> Cyclic dataflows can be built using iterations: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>> dev/datastream_api.html#iterations >>> >>> Best, >>> Gábor >>> >>> >>> >>> >>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>: >>> > I have a ConnectedStream (A) that depends on another ConnectedStream >>> (B), >>> > which depends on the first one (A). >>> > >>> > Simplified code: >>> > >>> > predictionStream = input >>> > .connect(statsStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj, output) { >>> > p = prediction(obj) >>> > output.collect(p) >>> > } >>> > flatMap2(stat, output) { >>> > updateModel(stat) >>> > } >>> > }) >>> > >>> > statsStream = input2 >>> > .connect(predictionStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj2, output) { >>> > s = getStats(obj2, p) >>> > output.collect(s) >>> > } >>> > flatMap2(prediction, output) { >>> > p = prediction >>> > } >>> > }) >>> > >>> > I'm guessing this should be possible to achieve, one way would be to >>> add a >>> > sink on statsStream to save the elements into Kafka and read from that >>> topic >>> > on predictionStream instead of initializing it with a reference of >>> > statsStream. But I would rather avoid writing unnecessarily into kafka. >>> > >>> > Is there any other way to achieve this? >>> > >>> > Thanks, >>> > Matt >>> >> >> >
Re: How to get top N elements in a DataSet?
Hello, Btw. there is a Jira about this: https://issues.apache.org/jira/browse/FLINK-2549 Note that the discussion there suggests a more efficient approach, which doesn't involve sorting the entire partitions. And if I remember correctly, this question comes up from time to time on the mailing list. Best, Gábor 2017-01-24 11:35 GMT+01:00 Fabian Hueske: > Hi Ivan, > > I think you can use MapPartition for that. > So basically: > > dataset // assuming some partitioning that can be reused to avoid a shuffle > .sortPartition(1, Order.DESCENDING) > .mapPartition(new ReturnFirstTen()) > .sortPartition(1, Order.DESCENDING).parallelism(1) > .mapPartition(new ReturnFirstTen()) > > Best, Fabian > > > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk : >> >> Hi, >> >> I have a dataset of tuples with two fields ids and ratings and I need to >> find 10 elements with the highest rating in this dataset. I found a >> solution, but I think it's suboptimal and I think there should be a better >> way to do it. >> >> The best thing that I came up with is to partition dataset by rating, sort >> locally and write the partitioned dataset to disk: >> >> dataset >> .partitionCustom(new Partitioner() { >> @Override >> public int partition(Double key, int numPartitions) { >> return key.intValue() % numPartitions; >> } >> }, 1) . // partition by rating >> .setParallelism(5) >> .sortPartition(1, Order.DESCENDING) // locally sort by rating >> .writeAsText("..."); // write the partitioned dataset to disk >> >> This will store tuples in sorted files with names 5, 4, 3, ... that >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted >> data from disk and and N elements with the highest rating. >> Is there a way to do the same but without writing a partitioned dataset to >> a disk? >> >> I tried to use "first(10)" but it seems to give top 10 items from a random >> partition. Is there a way to get top N elements from every partition? Then I >> could locally sort top values from every partition and find top 10 global >> values. >> >> Best regards, >> Ivan. >> >> >
Re: Equivalent of Rx combineLatest() on a join?
Dear Denis, I think you can do it with a simple CoFlatMapFunction (without windows): To use a CoFlatMapFunction, you need to first connect [1] your streams, which results in a ConnectedStreams. Then you can call flatMap on this, and give a CoFlatMapFunction to it (where two different callbacks are executed when an element arrives on one of the two streams). What you could do, is to have two members in your CoFlatMapFunction that store the latest values from the two streams, and you update the appropriate one when an element arrives and also emit a combined value from them. Best, Gábor [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#connect-org.apache.flink.streaming.api.datastream.DataStream- 2016-12-05 18:28 GMT+01:00: > Actually that doesn’t work as expected because emitted values are not > purged. I’ll experiment with purging triggers and/or evictors, though I have > the feeling that Flink was not designed for what we need to do here -- but > I’ll keep on searching. > > > > In the meantime any advice is appreciated. If the goal is not clear I can > provide more details. > > > > Thank you, > > > > Denis > > > > From: Dollfus, Denis (TR Technology & Ops) > Sent: lundi 5 décembre 2016 16:31 > To: user@flink.apache.org > Subject: RE: Equivalent of Rx combineLatest() on a join? > > > > Asking the response helped me to find the answer (yes, rubber duck > debugging) as it seems that the code below does what I need: > > > > s3 = s1.join(s2) > > .where(new KeySelector1()).equalTo(new KeySelector2()) > > .window(GlobalWindow.create()) > > .trigger(CountTrigger.of(1)) > > .apply(new JoinFunction); > > > > If that’s a common use case (in my view it is), a syntax shortcut could help > developers, e.g. something like: > > > > s3 = s1.join(s2) > > .where(new KeySelector1()).equalTo(new KeySelector2()) > > .combineLatest(new JoinFunction); > > > > Denis > > > > > > From: Dollfus, Denis (TR Technology & Ops) > Sent: lundi 5 décembre 2016 12:27 > To: user@flink.apache.org > Subject: Equivalent of Rx combineLatest() on a join? > > > > Hi all, > > > > [first email here, I’m new to Flink, Java and Scala, sorry if I missed > something obvious] > > > > I'm exploring Flink in the context of streaming calculators. Basically, the > data flow boils down to multiple data streams with variable update rates > (ms, seconds, …, month) which are joined before being fed to calculators. > The kind of operation I need is very similar to the Rx combineLatest > operator, which results in a object being emitted whenever one of the > streams is updated. > > > > As there is no such operator predefined, I think I have to use a > GlobalWindow and provide a custom WindowAssigner. The end result would look > like this (pseudo java 8 code, I hope it's understandable): > > > > DataStream s1 = env.addSource(..); > > DataStream s2 = env.addSource(..); > > > > S3 = s1.join(s2) > > .where(s1 -> id) > > .equalTo(s2 -> id) > > .window(new MyCustomCombineLatestAssigner()) > > .apply( … return new object combining data from s1 and from > s2); > > > > Is the approach correct, or is there a simpler way to achieve the same join > + apply mechanism ? > > > > Thank you, > > > > Denis > > > > > > > > > > > This e-mail is for the sole use of the intended recipient and contains > information that may be privileged and/or confidential. If you are not an > intended recipient, please notify the sender by return e-mail and delete > this e-mail and any attachments. Certain required legal entity disclosures > can be accessed on our website.
Re: spark vs flink batch performance
> "For csv reading, i deliberately did not use csv reader since i want to run > same code across spark and flink." > > If your objective deviates from writing and running the fastest Spark and > fastest Flink programs, then your comparison is worthless. Well, I don't really agree with this. I would say that it can actually be a valid objective to compare different systems in a way that we don't tune the code very much to the individual systems. This is because I guess it also happens sometimes in real (non-benchmark) jobs, that we don't want to spend too much time on tuning. However, in this case I also think that using the built-in CSV reader method would not constitute as "too much tuning to a specific system". So I would do this comparison with using the built-in CSV reader in both systems. Best, Gábor 2016-11-18 15:30 GMT+01:00 Greg Hogan <c...@greghogan.com>: > "For csv reading, i deliberately did not use csv reader since i want to run > same code across spark and flink." > > If your objective deviates from writing and running the fastest Spark and > fastest Flink programs, then your comparison is worthless. > > > > On Fri, Nov 18, 2016 at 5:37 AM, CPC <acha...@gmail.com> wrote: >> >> Hi Gabor, >> >> Thank you for your kind response. I forget to mention that i have actually >> three workers. This is why i set default paralelism to 6. >> >> For csv reading, i deliberately did not use csv reader since i want to run >> same code across spark and flink. Collect is returning 40k records which is >> not so big. >> >> I will try same test with spark 1.5 and 1.6 as well to understand whether >> spark 2.x series has some performance improvements because in those kind of >> tests, spark and flink was either on par or flink 10-15% faster than spark >> in the past. Aside from that are any configuration parameters you may >> propose to fine tune flink? >> >> Best, >> Anıl >> >> On Nov 18, 2016 12:25, "Gábor Gévay" <gga...@gmail.com> wrote: >>> >>> Hello, >>> >>> Your program looks mostly fine, but there are a few minor things that >>> might help a bit: >>> >>> Parallelism: In your attached flink-conf.yaml, you have 2 task slots >>> per task manager, and if you have 1 task manager, then your total >>> number of task slots is also 2. However, your default parallelism is >>> 6. In Flink, the recommended default parallelism is exactly the total >>> number of task slots [1]. (This is in contrast to Spark, where the >>> recommended setting is 2-3 per CPU core [2].) >>> >>> CSV reading: If your input is a CSV file, then you should use >>> readCsvFile (instead of readTextFile and then parsing it manually). >>> >>> Collect call: How large is the DataSet that you are using collect on? >>> If it is large, then we might try to figure out a way to get the top >>> 10 elements without first collecting the DataSet. >>> >>> Best, >>> Gábor >>> >>> [1] >>> https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it >>> [2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism >>> >>> >>> >>> >>> >>> 2016-11-16 22:38 GMT+01:00 CPC <acha...@gmail.com>: >>> > Hi all, >>> > >>> > I am trying to compare spark and flink batch performance. In my test i >>> > am >>> > using ratings.csv in >>> > http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I >>> > also >>> > concatenated ratings.csv 16 times to increase dataset size(total of >>> > 390465536 records almost 10gb).I am reading from google storage with >>> > gcs-connector and file schema is : userId,movieId,rating,timestamp. >>> > Basically i am calculating average rating per movie >>> > >>> > Code for flink(i tested CombineHint.HASH and CombineHint.SORT) >>> >> >>> >> case class Rating(userID: String, movieID: String, rating: Double, >>> >> date: >>> >> Timestamp) >>> > >>> > >>> >> >>> >> def parseRating(line: String): Rating = { >>> >> val arr = line.split(",") >>> >> Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong >>> >> * >>> >> 1000))) >>> >> } >>> > >>> > >>> >> >>> >> val ratings: DataS
Re: spark vs flink batch performance
Hello, Your program looks mostly fine, but there are a few minor things that might help a bit: Parallelism: In your attached flink-conf.yaml, you have 2 task slots per task manager, and if you have 1 task manager, then your total number of task slots is also 2. However, your default parallelism is 6. In Flink, the recommended default parallelism is exactly the total number of task slots [1]. (This is in contrast to Spark, where the recommended setting is 2-3 per CPU core [2].) CSV reading: If your input is a CSV file, then you should use readCsvFile (instead of readTextFile and then parsing it manually). Collect call: How large is the DataSet that you are using collect on? If it is large, then we might try to figure out a way to get the top 10 elements without first collecting the DataSet. Best, Gábor [1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it [2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism 2016-11-16 22:38 GMT+01:00 CPC: > Hi all, > > I am trying to compare spark and flink batch performance. In my test i am > using ratings.csv in > http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also > concatenated ratings.csv 16 times to increase dataset size(total of > 390465536 records almost 10gb).I am reading from google storage with > gcs-connector and file schema is : userId,movieId,rating,timestamp. > Basically i am calculating average rating per movie > > Code for flink(i tested CombineHint.HASH and CombineHint.SORT) >> >> case class Rating(userID: String, movieID: String, rating: Double, date: >> Timestamp) > > >> >> def parseRating(line: String): Rating = { >> val arr = line.split(",") >> Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * >> 1000))) >> } > > >> >> val ratings: DataSet[Rating] = >> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a => >> parseRating(a)) >> ratings >> .map(i => (i.movieID, 1, i.rating)) >> .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3), >> CombineHint.HASH) >> .map(i => (i._1, i._3 / >> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10) > > > with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s > > Code for Spark(i tested reduceByKey and reduceByKeyLocaly) >> >> case class Rating(userID: String, movieID: String, rating: Double, date: >> Timestamp) >> def parseRating(line: String): Rating = { >> val arr = line.split(",") >> Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * >> 1000))) >> } >> val conf = new SparkConf().setAppName("Simple Application") >> val sc = new SparkContext(conf) >> val keyed: RDD[(String, (Int, Double))] = >> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r >> => (r.movieID, (1, r.rating))) >> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i => >> i._2 / >> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println) > > > with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1 > minute(almost 3m6s) > > Machine config on google cloud: > taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory) > jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory) > java version:jdk jdk-8u102 > flink:1.1.3 > spark:2.0.2 > > I also attached flink-conf.yaml. Although it is not such a big difference > there is a 40% performance difference between spark and flink. Is there > something i am doing wrong? If there is not how can i fine tune flink or is > it normal spark has better performance with batch data? > > Thank you in advance...
Re: Retrieving values from a dataset of datasets
The short answer is that because DataSet is not serializable. I think the main underlying problem is that Flink needs to see all DataSet operations before launching the job. However, if you have a DataSet, then operations on the inner DataSets will end up being specified inside the UDFs of operations on the outer DataSet. This is a problem, because Flink cannot see inside the UDFs before the job starts, since they get executed only after the job starts executing. There are some workarounds though: 1. If you know that your inner DataSets would be small, then you can instead replace them with some regular Java/Scala collection class, like an Array or List. 2. You can often flatten your data, that is, somehow represent your nested collection with a flat collection. Exactly how to do this depends on your use case. For example, suppose that originally we wanted to represent the lengths of the shortest paths between all pairs of vertices in a graph by a DataSet that for every vertex contains a DataSet that tells us the distances to all the other Vertices: DataSet > which is a DataSet that contains pairs of vertices and their distances. Btw. [1] is a paper where some graph data structures having complex nesting are represented in Flink. Best, Gábor [1] http://dbs.uni-leipzig.de/file/EPGM.pdf 2016-11-15 17:37 GMT+01:00 otherwise777 : > It seems what i tried did indeed not work. > Can you explain me why that doesn't work though? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Retrieving-values-from-a-dataset-of-datasets-tp10108p10128.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
Re: Retrieving values from a dataset of datasets
Hello, How exactly do you represent the DataSet of DataSets? I'm asking because if you have something like a DataSetthat unfortunately doesn't work in Flink. Best, Gábor 2016-11-14 20:44 GMT+01:00 otherwise777 : > Hey There, > > I'm trying to calculate the betweenness in a graph with Flink and Gelly, the > way I tried this was by calculating the shortest path from every node to the > rest of the nodes. This results in a Dataset of vertices which all have > Datasets of their own with all the other vertices and their paths. > > Next i used the Reduce function on the inner DataSets so every inner DataSet > has 1 value. > > Now I have a DataSet of DataSets with 1 value each, but how do i efficiently > transform this into a a single DataSet with values? I can do a mapping on > the DataSet and use collect(), but i think that would be very costly > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Retrieving-values-from-a-dataset-of-datasets-tp10108.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
Re: Looping over a DataSet and accesing another DataSet
Hello, In Flink, one often used way to access data from multiple DataSets at the same time is to perform a join (Flink actually calls equi-joins [1] just "join"), just as in the database world. For example, in the algorithm that you linked, you access A[u] for every edge (u,v). I assume that you have stored A in a DataSet of (index, value) pairs. You can achieve this access pattern by performing a join, and in the join condition you specify that the first endpoint of the edge should be equal to the index of A. This way, you get a DataSet where every record contains an edge (u,v) and also A[u], so you can do a map on this where the UDF of your map will get (u,v) and A[u]. Your algorithm also accesses A[v], which can be achieved by performing a second join that is similar to the first (using the result of the first). However, the updating of P will be more tricky to translate to Flink. I'm not sure I undersand the linked algorithm correctly: does every element of P contain a list, and the + means appending an element to a list? (in the line P[v] = P[u] + v) Best, Gábor [1] https://en.wikipedia.org/wiki/Join_(SQL)#Equi-join 2016-10-30 8:25 GMT+01:00 otherwise777: > Currently i'm trying to implement this algorithm [1] which requires me to > loop over one DataSet (the edges) and access another DataSet (the vertices), > for this loop i use a Mapping (i'm not sure if this is the correct way of > looping over a DataSet) but i don't know how to access the elements of > another DataSet while i'm looping over one. > > I know Gelly also has iterative support for these kind of things, but they > loop over the Vertices and not the Edges > > [1] http://prntscr.com/d0qeyd > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-DataSet-and-accesing-another-DataSet-tp9778.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
Re: Flink strange stream join behavior
Hello, For your first question: > the number of tuples are same in both cases I guess you mean the total number of tuples here, right? So this means that you have fewer, but larger windows. Suppose that you have W windows, each with S tuples. Then your total input has W * S tuples, and your total output has W * S * S tuples (assuming that all pairs of tuples from the two matching windows are matched in the join, since then the join of the matching windows squares the number of tuples in the window). Now if you multiply the window size by r, but also divide the number of windows by r (so that the total number of input tuples stays the same), then you have W/r * S*r * S*r tuples, which simplified is W*S*S*r, so the total number of output tuples gets multiplied by r, and this is indeed close to the numbers that you reported: In your second case, you have about 40 times as much data in a window (I rounded your 37), and 40 times the output. (r = 40) Another way to imagine the situation is that when you chunk your data to larger windows, then more tuples will "meet" in the joins, since only those tuples are matched in the join that are in the same windows. I hope this helps, and I didn't misunderstand your situation. Best, Gábor 2016-10-15 23:28 GMT+02:00 Davood Rafiei: > Hi, > > I am experiencing strange flink stream windowed join behavior. > > I want to do windowed (processing time) join between two partitioned > streams. I read data from socket. > I have two cases: 1. data speed in socket is relatively slow (say 1K ps) 2. > data speed in socket is high (say 37K). > The number of tuples read from socket is same in both cases to both cases. > > Firstly, the size of output (of join operation) is much higher in case 2 > although the number of tuples are same in both cases. For example, in > case-1, the overall output size is 500M and in case 2 it is 20G. I couldn't > get the logic behind this. > > Secondly, in both cases, flink ingests all data from socket (more or less) > as soon as it is available. So, it has high throughput. However, especially > in case 2, I have to wait long time after data is ingested from source > operator. So, the data from socket is acquired and socket gets idle, and > then I have to wait long time to get actual output to sink. My question is > that, if this behavior is normal, and all the data acquired stays somewhere > inside flink, why backpressure is not applied to source operators? I mean if > the system cannot compute all inputs with high speed, then it should lower > the reading speed from socket. > > Thanks > Davood
Re: Nested iterations
I don't think that there are plans for enabling the nesting of the native iteration constructs, but we should wait for one of the commiters to confirm this. However, the matter of caching of intermediate results has came up on numerous occasions before [1,2,3,4,5], and it would be useful in lots of other situations as well, so there is hope that it will be implemented some day, which would make the 1. workaround from above more feasible. Best, Gábor [1] https://issues.apache.org/jira/browse/FLINK-1730 [2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Iteration-Intermediate-Output-td11850.html [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Questions-re-ExecutionGraph-amp-ResultPartitions-for-interactive-use-a-la-Spark-td4154.html [4] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-programm-with-for-loop-yields-wrong-results-when-run-in-parallel-td7783.html [5] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterative-queries-on-Flink-td3786.html 2016-09-01 18:31 GMT+02:00 Supun Kamburugamuve <supu...@gmail.com>: > Thanks Gabor. I was thinking about starting separate jobs. > > Is there any plans to support nested loops in the future? > > Thanks, > Supun.. > > On Thu, Sep 1, 2016 at 12:28 PM, Gábor Gévay <gga...@gmail.com> wrote: >> >> Hello Supun, >> >> Unfortunately, nesting of Flink's iteration constructs are not >> supported at the moment. >> >> There are some workarounds though: >> >> 1. You can start a Flink job for each step of the iteration. Starting >> a Flink job has some overhead, so this only works if there is a >> sufficient amount of work in each iteration step. Moreover, this has >> the disadvantage that the intermediate results are always need to be >> written out and then read back between steps, which might have a >> considerable performance impact. >> >> 2. If you have just a small fixed number of steps, then you can have a >> for loop that "unrolls" all the iteration steps, and creates one large >> Flink job. The code will be somewhat similar to the first approach, >> but you don't call execute between the steps, and you don't write >> intermediate results to a sink, but just use the DataSet from the >> previous step. The disadvantage of this is that you might end up with >> a too large Flink job, which might also hurt performance. >> >> Best, >> Gábor >> >> >> >> >> >> >> 2016-09-01 18:09 GMT+02:00 Supun Kamburugamuve <supu...@gmail.com>: >> > Hi, >> > >> > Does Flink support nested iterations? We are trying to develop a complex >> > machine learning algorithm which has 3 iterations nested. >> > >> > Best, >> > Supun.. >> > >> > > > > > > -- > Supun Kamburugamuve > Member, Apache Software Foundation; http://www.apache.org > E-mail: su...@apache.org; Mobile: +1 812 219 2563 > >
Re: Nested iterations
Hello Supun, Unfortunately, nesting of Flink's iteration constructs are not supported at the moment. There are some workarounds though: 1. You can start a Flink job for each step of the iteration. Starting a Flink job has some overhead, so this only works if there is a sufficient amount of work in each iteration step. Moreover, this has the disadvantage that the intermediate results are always need to be written out and then read back between steps, which might have a considerable performance impact. 2. If you have just a small fixed number of steps, then you can have a for loop that "unrolls" all the iteration steps, and creates one large Flink job. The code will be somewhat similar to the first approach, but you don't call execute between the steps, and you don't write intermediate results to a sink, but just use the DataSet from the previous step. The disadvantage of this is that you might end up with a too large Flink job, which might also hurt performance. Best, Gábor 2016-09-01 18:09 GMT+02:00 Supun Kamburugamuve: > Hi, > > Does Flink support nested iterations? We are trying to develop a complex > machine learning algorithm which has 3 iterations nested. > > Best, > Supun.. > >
Re: Performance issues with GroupBy?
Hello Robert, > Is there something I might could do to optimize the grouping? You can try to make your `RichGroupReduceFunction` implement the `GroupCombineFunction` interface, so that Flink can do combining before the shuffle, which might significantly reduce the network load. (How much the combiner helps the performance can greatly depend on how large are your groups on average.) Alternatively, if you can reformulate your algorithm to use a `reduce` instead of a `reduceGroup` that might also improve the performance. Also, if you are using a `reduce`, then you can try calling `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine hint is a relatively new feature, so you need the current master for this.) Best, Gábor 2016-07-25 14:06 GMT+02:00 Paschek, Robert: > Hi Mailing List, > > > > i actually do some benchmarks with different algorithms. The System has 8 > nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if > somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop > MapReduce, the execution mode is set to “BATCH_FORCED” > > > > It is suspicious, that three of the six algorithms had a big gap in runtime > (5000ms vs. 2ms) for easy (low dim) tuple. Additionally the algorithms > in the “upper” group using a groupBy transformation and the algorithms in > the “lower” group don’t use groupBy. > > I attached the plot for better visualization. > > > > I also checked the logs, especially the time, when the mappers finishing and > the reducers start _iterating_ - they hardened my speculation. > > > > So my question is, if it is “normal”, that grouping is so cost-intensive > that – in my case – the runtime increases by 4 times? > > I have data from the same experiments running on a 13 nodes cluster with 26 > cores with Apache Hadoop MapReduce, where the gap is still present, but > smaller (50s vs 57s or 55s vs 65s). > > > > Is there something I might could do to optimize the grouping? Some > codesnipplets: > > > > The Job: > DataSet output = input > > .mapPartition(new > MR_GPMRS_Mapper()).withBroadcastSet(metaData, > "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER") > > .groupBy(0) > > .reduceGroup(new > MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, > "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER"); > > > > MR_GPMRS_Mapper(): > > public class MR_GPMRS_Mapper extends > RichMapPartitionFunction , > BitSet, BitSet>>> > > > > MR_GPMRS_Reducer(): > > public class MR_GPMRS_Reducer extends > RichGroupReduceFunction , > BitSet, BitSet>>, T> > > > > The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the > Integer Key for grouping. > > > > Any suggestions (or comments, that it is a “normal” behaviour) are welcome : > - ) > > > > Thank you in advance! > > Robert
Re: Multi-field "sum" function just like "keyBy"
Ah, sorry, you are right. You could also call keyBy again before the second sum, but maybe someone else has a better idea. Best, Gábor 2016-06-07 16:18 GMT+02:00 Al-Isawi Rami <rami.al-is...@comptel.com>: > Thanks Gábor, but the first sum call will return > > SingleOutputStreamOperator > > I could not do another sum call on that. Would tell me how did you manage to > do > > stream.sum().sum() > > Regards, > -Rami > > On 7 Jun 2016, at 16:13, Gábor Gévay <gga...@gmail.com> wrote: > > Hello, > > In the case of "sum", you can just specify them one after the other, like: > > stream.sum(1).sum(2) > > This works, because summing the two fields are independent. However, > in the case of "keyBy", the information is needed from both fields at > the same time to produce the key. > > Best, > Gábor > > > > 2016-06-07 14:41 GMT+02:00 Al-Isawi Rami <rami.al-is...@comptel.com>: > > Hi, > > Is there any reason why “keyBy" accepts multi-field, while for example “sum” > does not. > > -Rami > Disclaimer: This message and any attachments thereto are intended solely for > the addressed recipient(s) and may contain confidential information. If you > are not the intended recipient, please notify the sender by reply e-mail and > delete the e-mail (including any attachments thereto) without producing, > distributing or retaining any copies thereof. Any review, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient(s) is prohibited. > Thank you. > > > Disclaimer: This message and any attachments thereto are intended solely for > the addressed recipient(s) and may contain confidential information. If you > are not the intended recipient, please notify the sender by reply e-mail and > delete the e-mail (including any attachments thereto) without producing, > distributing or retaining any copies thereof. Any review, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient(s) is prohibited. > Thank you.
Re: Multi-field "sum" function just like "keyBy"
Hello, In the case of "sum", you can just specify them one after the other, like: stream.sum(1).sum(2) This works, because summing the two fields are independent. However, in the case of "keyBy", the information is needed from both fields at the same time to produce the key. Best, Gábor 2016-06-07 14:41 GMT+02:00 Al-Isawi Rami: > Hi, > > Is there any reason why “keyBy" accepts multi-field, while for example “sum” > does not. > > -Rami > Disclaimer: This message and any attachments thereto are intended solely for > the addressed recipient(s) and may contain confidential information. If you > are not the intended recipient, please notify the sender by reply e-mail and > delete the e-mail (including any attachments thereto) without producing, > distributing or retaining any copies thereof. Any review, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient(s) is prohibited. Thank > you.
Re: time spent for iteration
Yes, I also think that it would be a nice feature. It would make the advantage of delta iterations (that later iterations take less time) more visible to the users. Best, Gábor 2016-03-09 15:25 GMT+01:00 Vasiliki Kalavri <vasilikikala...@gmail.com>: > I think it would be useful to allow for easier retrieval of this > information. > Wouldn't it make sense to expose this to the web UI for example? > We actually had a discussion about this some time ago [1]. > > -Vasia. > > [1]: https://issues.apache.org/jira/browse/FLINK-1759 > > On 9 March 2016 at 14:37, Gábor Gévay <gga...@gmail.com> wrote: >> >> Hello, >> >> If you increase the log level, you can see each step of the iteration >> separately in the log, with timestamps. >> >> Best, >> Gábor >> >> >> >> >> >> 2016-03-09 14:04 GMT+01:00 Riccardo Diomedi >> <riccardo.diomed...@gmail.com>: >> > Is it possible to add timer for the time spent for iteration when >> > iterate operator or the delta iterate operator is performed? >> > >> > thanks >> > >> > Riccardo > >
Re: time spent for iteration
Hello, If you increase the log level, you can see each step of the iteration separately in the log, with timestamps. Best, Gábor 2016-03-09 14:04 GMT+01:00 Riccardo Diomedi: > Is it possible to add timer for the time spent for iteration when iterate > operator or the delta iterate operator is performed? > > thanks > > Riccardo
Re: Best way to process data in many files? (FLINK-BATCH)
Hello, > // For each "filename" in list do... > DataSet featureList = fileList > .flatMap(new ReadDataSetFromFile()) // flatMap because there > might multiple DataSets in a file What happens if you just insert .rebalance() before the flatMap? > This kind of DataSource will only be executed > with a degree of parallelism of 1. The source will send it’s collection > elements in a round robin fashion to the downstream operators which are > executed with a higher parallelism. So when Flink schedules the downstream > operators, it will try to place them close to their inputs. Since all flat > map operators have the single data source task as an input, they will be > deployed on the same machine if possible. Sorry, I'm a little confused here. Do you mean that the flatMap will have a high parallelism, but all instances on a single machine? Because I tried to reproduce the situation where I have a non-parallel data source and then a flatMap, and the plan shows that the flatMap actually has parallelism 1, which would be an alternative explanation to the original problem that it gets executed on a single machine. Then, if I insert .rebalance() after the source, then a "Partition" operation appears between the source and the flatMap, and the flatMap has a high parallelism. I think this should also solve the problem, without having to write a parallel data source. Best, Gábor
Re: How to ensure exactly-once semantics in output to Kafka?
Hello, > I think that there is actually a fundamental latency issue with > "exactly once sinks", no matter how you implement them in any systems: > You can only commit once you are sure that everything went well, > to a specific point where you are sure no replay will ever be needed. What if the persistent buffer in the sink would be used to determine which data elements should be emitted in case of a replay? I mean, the sink pushes everything as soon as it arrives, and also writes everything to the persistent buffer, and then in case of a replay it looks into the buffer before pushing every element, and only does the push if the buffer says that the element was not pushed before. Best, Gábor 2016-02-05 11:57 GMT+01:00 Stephan Ewen: > Hi Niels! > > In general, exactly once output requires transactional cooperation from the > target system. Kafka has that on the roadmap, we should be able to integrate > that once it is out. > That means output is "committed" upon completed checkpoints, which > guarantees nothing is written multiple times. > > Chesnay is working on an interesting prototype as a generic solution (also > for Kafka, while they don't have that feature): > It buffers the data in the sink persistently (using the fault tolerance > state backends) and pushes the results out on notification of a completed > checkpoint. > That gives you exactly once semantics, but involves an extra materialization > of the data. > > > I think that there is actually a fundamental latency issue with "exactly > once sinks", no matter how you implement them in any systems: > You can only commit once you are sure that everything went well, to a > specific point where you are sure no replay will ever be needed. > > So the latency in Flink for an exactly-once output would be at least the > checkpoint interval. > > I'm eager to hear your thoughts on this. > > Greetings, > Stephan > > > On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes wrote: >> >> Hi, >> >> It is my understanding that the exactly-once semantics regarding the input >> from Kafka is based on the checkpointing in the source component retaining >> the offset where it was at the checkpoint moment. >> >> My question is how does that work for a sink? How can I make sure that (in >> light of failures) each message that is read from Kafka (my input) is >> written to Kafka (my output) exactly once? >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes > >
Re: How to ensure exactly-once semantics in output to Kafka?
The way I imagine this is that the sink would have its "own checkpoints" separately from the rest of the system, and with much smaller interval, and writes to Kafka (with "transactional cooperation", as Stephan mentioned) during making these checkpoints. And then when a replay happens from a global system checkpoint, it can look at its own checkpoints to decide for each tuple whether to send it or not. @Stephan: > That assumes deterministic streams and to some extend deterministic tuple > order. > That may be given sometimes, but it is a very strong assumption in many cases. Ah yes, you are right. But doing everything based on event time points in this direction of deterministic streams, right?
Re: Left join with unbalanced dataset
Hello Arnaud, > Flink does not start the reduce operation until all lines have > been created (memory bottleneck is during the collection > of all lines) ; but theorically it is possible. The problem that `S.groupBy(...).reduce(...)` needs to fully materialize S comes from the fact that the implementation of reduce is currently sort based. But this PR will partially solve this: https://github.com/apache/flink/pull/1517 It implements a hash-based combiner, which will not materialize the input, but instead needs memory proportional to only the number of different keys occurring. You might want to try rebasing to this PR, to see whether it improves your situation. (I also plan to extend this implementation to the actual reduce after the combine, but I'm not sure when will I get around to that.) Best, Gábor 2016-02-02 16:56 GMT+01:00 LINZ, Arnaud: > Thanks, > > Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads > to the success of the batch. > > I've figured out which dataset is consuming the most memory, I have a big > join that demultiplies the size of the input set before a group reduce. > I am willing to optimize my code by reducing the join output size upon > junction. > > The outline of the treatment is : > DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge. > DataSet B = (K1, V2) where there are multiple values V2 for the same K1 (say > 5) > > I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce() > As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A. > > Flink does not start the reduce operation until all lines have been created > (memory bottleneck is during the collection of all lines) ; but theorically > it is possible. > I see no "join group" operator that could do something like > "A.groupBy(K1,K2).join(B).on(K1).reduce()" > > Is there a way to do this ? > > The other way I see is to load B in memory for all nodes and use a hash map > upon reduction to get all A.join(B) lines. B is not that small, but I think > it will still save RAM. > > Best regards, > Arnaud > > -Message d'origine- > De : Ufuk Celebi [mailto:u...@apache.org] > Envoyé : mardi 2 février 2016 15:27 > À : user@flink.apache.org > Objet : Re: Left join with unbalanced dataset > > >> On 02 Feb 2016, at 15:15, LINZ, Arnaud wrote: >> >> Hi, >> >> Running again with more RAM made the treatement go further, but Yarn still >> killed one container for memory consumption. I will experiment various >> memory parameters. > > OK, the killing of the container probably triggered the > RemoteTransportException. > > Can you tell me how many containers you are using, how much phyiscal memory > the machines have and how much the containers get? > > You can monitor memory usage by setting > > taskmanager.debug.memory.startLogThread: true > > in the config. This will periodically log the memory consumption to the task > manager logs. Can you try this and check the logs for the memory consumption? > > You can also have a look at it in the web frontend under the Task Manager tab. > > – Ufuk > > > > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous > n'êtes pas destinataire de ce message, merci de le détruire et d'avertir > l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is prohibited. > If you are not the intended recipient of this message, then please delete it > and notify the sender.
Re: Local collection data sink for the streaming API
Try the getJavaStream method of the scala DataStream. Best, Gábor 2016-01-05 19:14 GMT+01:00 Filipe Correia <filipe.corr...@nmusic.pt>: > Hi Gábor, Thanks! > > I'm using Scala though. DataStreamUtils.collect() depends on > org.apache.flink.streaming.api.datastream.DataStream, rather than > org.apache.flink.streaming.api.scala.DataStream. Any suggestion on how > to handle this, other than creating my own scala implementation of > DataStreamUtils.collect()? > > Thanks, > > Filipe > > On Tue, Jan 5, 2016 at 3:33 PM, Gábor Gévay <gga...@gmail.com> wrote: >> Hi Filipe, >> >> You can take a look at `DataStreamUtils.collect` in >> flink-contrib/flink-streaming-contrib. >> >> Best, >> Gábor >> >> >> >> 2016-01-05 16:14 GMT+01:00 Filipe Correia <filipe.corr...@nmusic.pt>: >>> Hi, >>> >>> Collecting results locally (e.g., for unit testing) is possible in the >>> DataSet API by using "LocalCollectionOutputFormat", as described in >>> the programming guide: >>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#collection-data-sources-and-sinks >>> >>> Can something similar be done for the DataStream API? >>> >>> Thanks, >>> >>> Filipe
Re: Reading multiple datasets with one read operation
Hello! > I have thought about a workaround where the InputFormat would return > Tuple2s and the first field is the name of the dataset to which a record > belongs. This would however require me to filter the read data once for > each dataset or to do a groupReduce which is some overhead i'm > looking to prevent. I think that those two filters might not have that much overhead, because of several optimizations Flink does under the hood: - The dataset of Tuple2s won't be materialized, but instead will be streamed directly to the two filter operators. - The input format and the two filters will probably end up on the same machine, because of chaining, so there won't be serialization/deserialization between them. Best, Gabor 2015-10-22 11:38 GMT+02:00 Pieter Hameete: > Good morning! > > I have the following usecase: > > My program reads nested data (in this specific case XML) based on > projections (path expressions) of this data. Often multiple paths are > projected onto the same input. I would like each path to result in its own > dataset. > > Is it possible to generate more than 1 dataset using a readFile operation to > prevent reading the input twice? > > I have thought about a workaround where the InputFormat would return Tuple2s > and the first field is the name of the dataset to which a record belongs. > This would however require me to filter the read data once for each dataset > or to do a groupReduce which is some overhead i'm looking to prevent. > > Is there a better (less overhead) workaround for doing this? Or is there > some mechanism in Flink that would allow me to do this? > > Cheers! > > - Pieter
Re: For each element in a dataset, do something with another dataset
Hello, Alternatively, if dataset B fits in memory, but dataset A doesn't, then you can do it with broadcasting B to a RichMapPartitionFunction on A: In the open method of mapPartition, you sort B. Then, for each element of A, you do a binary search in B, and look at the index found by the binary search, which will be the count that you are looking for. Best, Gabor 2015-09-30 11:20 GMT+02:00 Fabian Hueske: > The idea is to partition both datasets by range. > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: > [1,2,3] and p2: [4,5,6]. > Each partition is given to a different instance of a MapPartition operator > (this is a bit tricky, because you cannot use broadcastSet. You could load > the corresponding partition it in the open() function from HDFS for > example). > > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to > partition 1, everything > 3 goes to p2. You can partition a dataset by range > using the partitionCustom() function. The partitioned dataset is given to > the mapPartition operator that loaded a partition of dataset A in each task > instance. > You do the counting just like before (sorting the partition of dataset A, > binary sort, long[]), but add an additional count for the complete partition > (basically count all elements that arrive in the task instance). > > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7]. > Now you need to compute the final count by adding the "all" counts of the > lower partitions to the counts of the "higher" partitions, i.e., add all:5 > of p1 to all counts for p2. > > This approach requires to know the value range and distribution of the > values which makes it a bit difficult. I guess you'll get the best > performance, if you partition in a way, that you have about equally sized > partitions of dataset B with the constraint that the corresponding > partitions of A fit into memory. > > As I said, its a bit cumbersome. I hope you could follow my explanation. > Please ask if something is not clear ;-) > > 2015-09-30 10:46 GMT+02:00 Pieter Hameete : >> >> Hi Fabian, >> >> thanks for your tips! >> >> Do you have some pointers for getting started with the 'tricky range >> partitioning'? I am quite keen to get this working with large datasets ;-) >> >> Cheers, >> >> Pieter >> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske : >>> >>> Hi Pieter, >>> >>> cross is indeed too expensive for this task. >>> >>> If dataset A fits into memory, you can do the following: Use a >>> RichMapPartitionFunction to process dataset B and add dataset A as a >>> broadcastSet. In the open method of mapPartition, you can load the >>> broadcasted set and sort it by a.propertyX and initialize a long[] for the >>> counts. For each element of dataset B, you do a binary search on the sorted >>> dataset A and increase all counts up to the position in the sorted list. >>> After all elements of dataset B have been processed, return the counts from >>> the long[]. >>> >>> If dataset A doesn't fit into memory, things become more cumbersome and >>> we need to play some tricky with range partitioning... >>> >>> Let me know, if you have questions, >>> Fabian >>> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete : Good day everyone, I am looking for a good way to do the following: I have dataset A and dataset B, and for each element in dataset A I would like to filter dataset B and obtain the size of the result. To say it short: for each element a in A -> B.filter( _ < a.propertyx).count Currently I am doing a cross of dataset A and B, making tuples so I can then filter all the tuples where field2 < field1.propertya and then group by field1.id and get the sizes of the groups.However this is not working out in practice. When the datasets get larger, some Tasks hang on the CHAIN Cross -> Filter probably because there is insufficient memory for the cross to be completed? Does anyone have a suggestion on how I could make this work, especially with datasets that are larger than memory available to a separate Task? Thank you in advance for your time :-) Kind regards, Pieter Hameete >>> >>> >> >
Re: Gelly vertex ID type requirements?
Hi, I opened a JIRA (FLINK-2442) and submitted a PR (#963) for the Wrong field type problem. Thanks for the fix! Is the other problem is addressed in FLINK-2437? Unfortunately no. FLINK-2437 is a minor thing compared to the other problem. I opened a Jira for it (FLINK-2447). Best, Gabor 2015-07-31 0:29 GMT+02:00 Fabian Hueske fhue...@gmail.com: Hi, I opened a JIRA (FLINK-2442) and submitted a PR (#963) for the Wrong field type problem. Is the other problem is addressed in FLINK-2437? Cheers, Fabian 2015-07-30 16:29 GMT+02:00 Gábor Gévay gga...@gmail.com: Thanks for the response. As a temporary workaround, I tried to change these problematic lines: } else { Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); } into this: } else if (fieldType instanceof AtomicType) { keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); } else { Preconditions.checkArgument(fieldType instanceof PojoTypeInfo, Wrong field type: + fieldType.toString()); ((PojoTypeInfo)fieldType).getFlatFields(*, 0, keyFields); } But then I ran into another problem: The TypeExtractor creates the TupleTypeInfoBase for the Edge type of my graph with the following types: 0 = {PojoTypeInfo@1067} PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] 1 = {GenericTypeInfo@1068} GenericTypemalom.GameState 2 = {ValueTypeInfo@1069} ValueTypeNullValue The problem here is that the first two types should clearly be the same, as the Edge type looks like this: public class EdgeK, V extends Tuple3K, K, V I did a bit of debugging on this, and the source of the problem seems to be the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of malom.GameState with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Edge type, and in line 433 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1190, privateGetForClass adds malom.GameState to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1186, that a recursive type is being processed. Should I open a Jira for this? A possible solution would be to change the alreadySeen field into a parameter of all the various type extraction methods, and make it contain only those types that are ancestors in the nesting hierarchy. Best regards, Gabor 2015-07-30 14:32 GMT+02:00 Fabian Hueske fhue...@gmail.com: Thanks for reporting this issue. The Wrong field type error looks like a bug to me. This happens, because PojoType is neither a TupleType nor an AtomicType. To me it looks like the TupleTypeInfoBase condition should be generalized to CompositeType. I will look into this. Cheers, Fabian 2015-07-30 14:18 GMT+02:00 Gábor Gévay gga...@gmail.com: Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide specifies.) If the outer class has a default constructor, then it is recognized as a POJO, but I get the following exception: Exception in thread main java.lang.IllegalArgumentException: Wrong field type: PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458) at org.apache.flink.graph.Graph.inDegrees(Graph.java:701) at org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610) at org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044) at org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312) at malom.Retrograde.run(Retrograde.java:64) at malom.Solver.main(Solver.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method
Gelly vertex ID type requirements?
Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide specifies.) If the outer class has a default constructor, then it is recognized as a POJO, but I get the following exception: Exception in thread main java.lang.IllegalArgumentException: Wrong field type: PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458) at org.apache.flink.graph.Graph.inDegrees(Graph.java:701) at org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610) at org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044) at org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312) at malom.Retrograde.run(Retrograde.java:64) at malom.Solver.main(Solver.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Note, that originally the exception just said Wrong field type, from which I had no idea what type is it referring to, so I modified line 241 of Keys.java to include the type in the msg like this: Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); On the other hand, if I comment out the default constructor of the outer class, then it is not a POJO, but only a GenericTypeInfo is created from it, which implements AtomicType, so the previous exception does not appear. Does this mean that the Vertex IDs cannot be POJOs? I am not sure if this is the intended behaviour. After all, if we have a POJO, that should always be better then if we have a generic type, right? (I am just guessing here, but maybe CompositeType could also be regarded as an AtomicType: the only method declared by the AtomicType interface is createComparator, which is also defined in CompositeType (of which PojoTypeInfo is a subclass), but with different parameters, but maybe CompositeType could implement AtomicType by delegating the createComparator call with all of the fields specified?) I encountered another problem, which is may or may not be related to the above: without the default constructor (the GenericTypeInfo case), the VertexCentricIteration eats my graph, that is, the result graph has zero vertices. I traced this problem to the first join in VertexCentricIteration.createResultVerticesWithDegrees, where the degrees DataSet is created: both inputs of the join (outDegrees and inDegrees) contains the correct data, but the result (degrees) is empty. Interestingly, this problem disappears, if I add JoinHint.REPARTITION_SORT_MERGE. Best regards, Gabor
Re: Gelly vertex ID type requirements?
Thanks for the response. As a temporary workaround, I tried to change these problematic lines: } else { Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); } into this: } else if (fieldType instanceof AtomicType) { keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); } else { Preconditions.checkArgument(fieldType instanceof PojoTypeInfo, Wrong field type: + fieldType.toString()); ((PojoTypeInfo)fieldType).getFlatFields(*, 0, keyFields); } But then I ran into another problem: The TypeExtractor creates the TupleTypeInfoBase for the Edge type of my graph with the following types: 0 = {PojoTypeInfo@1067} PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] 1 = {GenericTypeInfo@1068} GenericTypemalom.GameState 2 = {ValueTypeInfo@1069} ValueTypeNullValue The problem here is that the first two types should clearly be the same, as the Edge type looks like this: public class EdgeK, V extends Tuple3K, K, V I did a bit of debugging on this, and the source of the problem seems to be the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of malom.GameState with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Edge type, and in line 433 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1190, privateGetForClass adds malom.GameState to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1186, that a recursive type is being processed. Should I open a Jira for this? A possible solution would be to change the alreadySeen field into a parameter of all the various type extraction methods, and make it contain only those types that are ancestors in the nesting hierarchy. Best regards, Gabor 2015-07-30 14:32 GMT+02:00 Fabian Hueske fhue...@gmail.com: Thanks for reporting this issue. The Wrong field type error looks like a bug to me. This happens, because PojoType is neither a TupleType nor an AtomicType. To me it looks like the TupleTypeInfoBase condition should be generalized to CompositeType. I will look into this. Cheers, Fabian 2015-07-30 14:18 GMT+02:00 Gábor Gévay gga...@gmail.com: Hello, I am having some trouble building a graph where the vertex ID type is a POJO. Specifically, it would be a class that has two fields: a long field, and a field which is of another class that has four byte fields. (Both classes implement the Comparable interface, as the Gelly guide specifies.) If the outer class has a default constructor, then it is recognized as a POJO, but I get the following exception: Exception in thread main java.lang.IllegalArgumentException: Wrong field type: PojoTypemalom.GameState, fields = [board: Long, sid: PojoTypemalom.SectorId, fields = [b: Byte, bf: Byte, w: Byte, wf: Byte]] at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:241) at org.apache.flink.api.java.operators.Keys$ExpressionKeys.init(Keys.java:203) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:458) at org.apache.flink.graph.Graph.inDegrees(Graph.java:701) at org.apache.flink.graph.spargel.VertexCentricIteration.createResultVerticesWithDegrees(VertexCentricIteration.java:610) at org.apache.flink.graph.spargel.VertexCentricIteration.createResult(VertexCentricIteration.java:180) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1044) at org.apache.flink.graph.Graph.runVertexCentricIteration(Graph.java:1312) at malom.Retrograde.run(Retrograde.java:64) at malom.Solver.main(Solver.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Note, that originally the exception just said Wrong field type, from which I had no idea what type is it referring to, so I modified line 241 of Keys.java to include the type in the msg like this: Preconditions.checkArgument(fieldType instanceof AtomicType, Wrong field type: + fieldType.toString()); On the other hand, if I comment out the default constructor of the outer class