Hi David,

Flink's DataSet API schedules one slice of a program to a task slot. A
program slice is one parallel instance of each operator of a program.
When all operator of your program run with a parallelism of 1, you end up
with only 1 slice that runs on a single slot.
Flink's DataSet API leverages data parallelism (running parallel instance
of the same operator on different workers working on different data
partitions) instead of task parallelism (running different operators on
different workers).

Regarding your task, I would implement a custom InputFormat which extends
the FileInputFormat. The FileInputFormat.open() [1] method is called with a
FileInputSplit [2] which contains the file path. You can put the path aside
and add as an additional field when emitting records in the nextRecord()
method.
This way, you only need two sources (one for /source1 and one for /source2)
and can join the records on a composite key of filename and join key. This
should balance the load evenly over a larger number of keys.
However, you would lose the advantage of pre-partitioned files because all
data of source1 would be joined with all data of source2.

There is a low-level interface to leverage pre-partitioned files. With
SplitDataProperties [3] you can specify that the data produced by a
DataSource [4] is partitioned by InputSplit.
If you implement the source in a way that a single split contains the
information to read both files, you can avoid an additional shuffle and
join locally. This is manual low-level optimization where you need to know
what you are doing. I'm not sure if this is documented except for the Java
Docs.

Hope this helps,
Fabian

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L684
[2]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java#L34
[3]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java#L101
[4]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java#L117






2017-10-26 4:13 GMT+02:00 David Dreyfus <dddrey...@gmail.com>:

> Hello -
>
> I have a large number of pairs of files. For purpose of discussion:
> /source1/{1..10000} and /source2/{1..10000}.
>
> I want to join the files pair-wise: /source1/1 joined to /source2/1,
> /source1/2 joined to /source2/2, and so on.
> I then want to union the results of the pair-wise joins and perform an
> aggregate.
>
> I create a simple flink job that has four sources, two joins, and two sinks
> to produce intermediate results. This represents two unrelated chains.
>
> I notice that when running this job with parallelism = 1 on a standalone
> machine with one task manager and 3 slots, only one slot gets used.
>
> My concern is that when I scale up to a YARN cluster, flink will continue
> to
> use one slot on one machine instead of using all slots on all machines.
>
> Prior reading suggests all the data source subtasks are added to a default
> resource group. Downstream tasks (joins and sinks) want to be colocated
> with
> the data sources. The result is all of my tasks are executed in one slot.
>
> Flink Stream (DataStream) offers the slotSharingGroup() function. This
> doesn't seem available to the DataSet user.
>
> *Q1:* How do I force Flink to distribute work evenly across task managers
> and the slots allocated to them? If this shouldn't be a concern, please
> elaborate.
>
> When I scale up the number of unrelated chains I notice that flink seems to
> start all of them at the same time, which results in thrashing and errors -
> lots of IO and errors regarding hash buffers.
>
> *Q2:* Is there any method for controlling the scheduling of tasks so that
> some finish before others start? My work around is to execute multiple,
> sequential batches with results going into an intermediate directory, and
> then a final job that aggregates the results. I would certainly prefer one
> job that might avoid the intermediate write.
>
> If I treat /source1 as one data source and /source2 as the second, and then
> join the two, flink will shuffle and partition the files on the join key.
> The /source1 and /source2 files represent this partitioning. They are
> reused
> multiple times; thus, I shuffle and save the results creating /source1 and
> /source2.
>
> *Q3:* Does flink have a method by which I can mark individual files (or
> directories) as belonging to a particular partition so that when I try to
> join them, the unnecessary shuffle and repartition is avoided?
>
> Thank you,
> David
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to