I think we are still talking about the same issue as in a related question. I suspect that the MutableInputFormatTest does not properly return the splits in the "createInputSplits()" function.
To validate that, you can write yourself a unit test that checks whether the input format returns your splits from the method "createInputSplits()". On Fri, May 29, 2015 at 5:59 PM, Mustafa Elbehery <[email protected] > wrote: > Hi Folks, > > I am reviving this thread again, as I am stuck in one step to achieve my > target. > > the following code is doing partitioning, before coGrouping, then writing > to disk. I am trying to re-read the data from disk, so I have > create*LocatableInputSPlits > [] *with the size of DOP. Find the code Below > > inPerson.partitionByHash("name") > .map(new TrackHost()) > .coGroup(inStudent.partitionByHash("name")) > .where("name").equalTo("name") > .with(new ComputeStudiesProfile()) > .write(new TextOutputFormat(new Path()), > "file:///home/mustafa/Documents/tst/", FileSystem.WriteMode.OVERWRITE); > > LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()]; > splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost"); > splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost"); > splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost"); > DataSet<Person> secondIn = env.createInput(new MutableInputFormatTest(new > Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new PersonMapper()); > secondIn.print(); > > > > TrackHost is an Accumulator to track the host information, && > MutuableInputFormat, is an customInputFormat which extends TextInputFormat && > implements StrictlyLocalAssignment .. > > I am using LocatableInputSplit as a instanceField, as implementing InputSplit > is conflicting with TextInputFormat, on the createInputSplit method, they > both have the same method and the compiler complained for that. > > > Again, while debugging I could see the problem in *ExectionJobVertex line 146 > . *the execution ignores the Locatables I am shipping with my splits, and > re-create inputSplits again which get the hostInfo(Machine Name) from the > execution somehow, while the taskManagers prepared by the scheduler waiting > for a machine with "LocalHost". > > Any Suggestion ?? > > Regards. > > > > > On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <[email protected]> wrote: > >> Alright, so if both inputs of the CoGroup are read from the file system, >> there should be a way to do the co-group on co-located data without >> repartitioning. >> In fact, I have some code lying around to do co-located joins from local >> FS [1]. Haven't tested it thoroughly and it also relies on a number of >> assumptions. If the data is also sorted you can even get around sorting it >> if you inject a few lines into the optimizer (see change for FLINK-1444) >> and ensure that each source reads exactly one! input split. >> >> Regarding your question about the PACT output contracts, there were three >> types which were defined wrt to a Key/Value pair data model: >> - Same key: UDF does not modify the key >> - Super key: UDF extends the key (Partitioning remains valid, sorting not) >> - Unique key: Keys from UDF or source are unique >> >> Let me know, if you have questions. >> Cheers, Fabian >> >> [1] https://github.com/fhueske/flink-localjoin-utils >> >> 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov < >> [email protected]>: >> >>> Thanks for the feedback, Fabian. >>> >>> This is related to the question I sent on the user mailing list >>> yesterday. Mustafa is working on a master thesis where we try to abstract >>> an operator for the update of stateful datasets (decoupled from the current >>> native iterations logic) and use it in conjunction with lazy unrolling of >>> iterations. >>> >>> The assumptions are as follows: >>> >>> - Each iteration runs a job with the same structure and the same DOP; >>> - Updates a realized through a coGroup with a fixed DOP (let's say >>> *N*), which consumes a *(state, updates)* pair of datasets and >>> produces a new version of the state (let's call it *state'*); >>> - We keep track where the *N* output partitions of *state'* are >>> located and use this information for local placement of the corresponding >>> *N* DataSource tasks in the next iteration (via FLINK-1478); >>> - The remaining piece of the puzzle is to figure out how to tell the >>> coGroup that one of the inputs is already partitioned so id avoids an >>> unnecessary shuffle; >>> >>> If I remember correctly back in the day we had a PACT output contract >>> that served a similar purpose avoid unnecessary shuffles), but I was not >>> able to find it yesterday. >>> >>> In either case, I think even if that does not work out of the box at the >>> moment, that most of the logic is in place (e.g. co-location groups in the >>> scheduler), and we are willing to either hack the code or add the missing >>> functionality in order to realize the above described goal. >>> >>> Suggestions are welcome! >>> >>> Regards, >>> Alex >>> >>> >>> >>> >>> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <[email protected]>: >>> >>>> Hi Mustafa, >>>> >>>> I'm afraid, this is not possible. >>>> Although you can annotate DataSources with partitioning information, >>>> this is not enough to avoid repartitioning for a CoGroup. The reason for >>>> that is that CoGroup requires co-partitioning of both inputs, i.e., both >>>> inputs must be equally partitioned (same number of partitions, same >>>> partitioning function, same location of partitions). Since Flink is >>>> dynamically assigning tasks to execution slots, it is not possible to >>>> co-locate data that was read from a data source and data coming from the >>>> result of another computation. >>>> >>>> If you just need the result of the first co-group on disk, you could >>>> also build a single program that does both co-groups and additional writes >>>> the result of the first co-group to disk (Flink supports multiple data >>>> sinks). >>>> >>>> Best, Fabian >>>> >>>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[email protected]> >>>> : >>>> >>>>> Hi, >>>>> >>>>> I am writing a flink job, in which I have three datasets. I have >>>>> partitionedByHash the first two before coGrouping them. >>>>> >>>>> My plan is to spill the result of coGrouping to disk, and then re-read >>>>> it again before coGrouping with the third dataset. >>>>> >>>>> My question is, is there anyway to inform flink that the first coGroup >>>>> result is already partitioned ?! I know I can re-partition again before >>>>> coGrouping but I would like to know if there is anyway to avoid a step >>>>> which was already executed, >>>>> >>>>> Regards. >>>>> >>>>> -- >>>>> Mustafa Elbehery >>>>> EIT ICT Labs Master School >>>>> <http://www.masterschool.eitictlabs.eu/home/> >>>>> +49(0)15750363097 >>>>> skype: mustafaelbehery87 >>>>> >>>>> >>>> >>> >> > > > -- > Mustafa Elbehery > EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> > +49(0)15750363097 > skype: mustafaelbehery87 > >
