Alright, so if both inputs of the CoGroup are read from the file system,
there should be a way to do the co-group on co-located data without
repartitioning.
In fact, I have some code lying around to do co-located joins from local FS
[1]. Haven't tested it thoroughly and it also relies on a number of
assumptions. If the data is also sorted you can even get around sorting it
if you inject a few lines into the optimizer (see change for FLINK-1444)
and ensure that each source reads exactly one! input split.

Regarding your question about the PACT output contracts, there were three
types which were defined wrt to a Key/Value pair data model:
- Same key: UDF does not modify the key
- Super key: UDF extends the key (Partitioning remains valid, sorting not)
- Unique key: Keys from UDF or source are unique

Let me know, if you have questions.
Cheers, Fabian

[1] https://github.com/fhueske/flink-localjoin-utils

2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <
[email protected]>:

> Thanks for the feedback, Fabian.
>
> This is related to the question I sent on the user mailing list yesterday.
> Mustafa is working on a master thesis where we try to abstract an operator
> for the update of stateful datasets (decoupled from the current native
> iterations logic) and use it in conjunction with lazy unrolling of
> iterations.
>
> The assumptions are as follows:
>
>    - Each iteration runs a job with the same structure and the same DOP;
>    - Updates a realized through a coGroup with a fixed DOP (let's say *N*),
>    which consumes a *(state, updates)* pair of datasets and produces a
>    new version of the state (let's call it *state'*);
>    - We keep track where the *N* output partitions of *state'* are
>    located and use this information for local placement of the corresponding
>    *N* DataSource tasks in the next iteration (via FLINK-1478);
>    - The remaining piece of the puzzle is to figure out how to tell the
>    coGroup that one of the inputs is already partitioned so id avoids an
>    unnecessary shuffle;
>
> If I remember correctly back in the day we had a PACT output contract that
> served a similar purpose avoid unnecessary shuffles), but I was not able to
> find it yesterday.
>
> In either case, I think even if that does not work out of the box at the
> moment, that most of the logic is in place (e.g. co-location groups in the
> scheduler), and we are willing to either hack the code or add the missing
> functionality in order to realize the above described goal.
>
> Suggestions are welcome!
>
> Regards,
> Alex
>
>
>
>
> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <[email protected]>:
>
>> Hi Mustafa,
>>
>> I'm afraid, this is not possible.
>> Although you can annotate DataSources with partitioning information, this
>> is not enough to avoid repartitioning for a CoGroup. The reason for that is
>> that CoGroup requires co-partitioning of both inputs, i.e., both inputs
>> must be equally partitioned (same number of partitions, same partitioning
>> function, same location of partitions). Since Flink is dynamically
>> assigning tasks to execution slots, it is not possible to co-locate data
>> that was read from a data source and data coming from the result of another
>> computation.
>>
>> If you just need the result of the first co-group on disk, you could also
>> build a single program that does both co-groups and additional writes the
>> result of the first co-group to disk (Flink supports multiple data sinks).
>>
>> Best, Fabian
>>
>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[email protected]>:
>>
>>> Hi,
>>>
>>> I am writing a flink job, in which I have three datasets.  I have
>>> partitionedByHash the first two before coGrouping them.
>>>
>>> My plan is to spill the result of coGrouping to disk, and then re-read
>>> it again before coGrouping with the third dataset.
>>>
>>> My question is, is there anyway to inform flink that the first coGroup
>>> result is already partitioned ?!  I know I can re-partition again before
>>> coGrouping but I would like to know if there is anyway to avoid a step
>>> which was already executed,
>>>
>>> Regards.
>>>
>>> --
>>> Mustafa Elbehery
>>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>>> +49(0)15750363097
>>> skype: mustafaelbehery87
>>>
>>>
>>
>

Reply via email to