Alright, so if both inputs of the CoGroup are read from the file system, there should be a way to do the co-group on co-located data without repartitioning. In fact, I have some code lying around to do co-located joins from local FS [1]. Haven't tested it thoroughly and it also relies on a number of assumptions. If the data is also sorted you can even get around sorting it if you inject a few lines into the optimizer (see change for FLINK-1444) and ensure that each source reads exactly one! input split.
Regarding your question about the PACT output contracts, there were three types which were defined wrt to a Key/Value pair data model: - Same key: UDF does not modify the key - Super key: UDF extends the key (Partitioning remains valid, sorting not) - Unique key: Keys from UDF or source are unique Let me know, if you have questions. Cheers, Fabian [1] https://github.com/fhueske/flink-localjoin-utils 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov < [email protected]>: > Thanks for the feedback, Fabian. > > This is related to the question I sent on the user mailing list yesterday. > Mustafa is working on a master thesis where we try to abstract an operator > for the update of stateful datasets (decoupled from the current native > iterations logic) and use it in conjunction with lazy unrolling of > iterations. > > The assumptions are as follows: > > - Each iteration runs a job with the same structure and the same DOP; > - Updates a realized through a coGroup with a fixed DOP (let's say *N*), > which consumes a *(state, updates)* pair of datasets and produces a > new version of the state (let's call it *state'*); > - We keep track where the *N* output partitions of *state'* are > located and use this information for local placement of the corresponding > *N* DataSource tasks in the next iteration (via FLINK-1478); > - The remaining piece of the puzzle is to figure out how to tell the > coGroup that one of the inputs is already partitioned so id avoids an > unnecessary shuffle; > > If I remember correctly back in the day we had a PACT output contract that > served a similar purpose avoid unnecessary shuffles), but I was not able to > find it yesterday. > > In either case, I think even if that does not work out of the box at the > moment, that most of the logic is in place (e.g. co-location groups in the > scheduler), and we are willing to either hack the code or add the missing > functionality in order to realize the above described goal. > > Suggestions are welcome! > > Regards, > Alex > > > > > 2015-05-18 17:42 GMT+02:00 Fabian Hueske <[email protected]>: > >> Hi Mustafa, >> >> I'm afraid, this is not possible. >> Although you can annotate DataSources with partitioning information, this >> is not enough to avoid repartitioning for a CoGroup. The reason for that is >> that CoGroup requires co-partitioning of both inputs, i.e., both inputs >> must be equally partitioned (same number of partitions, same partitioning >> function, same location of partitions). Since Flink is dynamically >> assigning tasks to execution slots, it is not possible to co-locate data >> that was read from a data source and data coming from the result of another >> computation. >> >> If you just need the result of the first co-group on disk, you could also >> build a single program that does both co-groups and additional writes the >> result of the first co-group to disk (Flink supports multiple data sinks). >> >> Best, Fabian >> >> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[email protected]>: >> >>> Hi, >>> >>> I am writing a flink job, in which I have three datasets. I have >>> partitionedByHash the first two before coGrouping them. >>> >>> My plan is to spill the result of coGrouping to disk, and then re-read >>> it again before coGrouping with the third dataset. >>> >>> My question is, is there anyway to inform flink that the first coGroup >>> result is already partitioned ?! I know I can re-partition again before >>> coGrouping but I would like to know if there is anyway to avoid a step >>> which was already executed, >>> >>> Regards. >>> >>> -- >>> Mustafa Elbehery >>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> >>> +49(0)15750363097 >>> skype: mustafaelbehery87 >>> >>> >> >
