Hi David,

please find my answers below:

1. For high utilization, all slot should be filled. Each slot will
processes a slice of the program on a slice of the data. In case of
partitioning or changed parallelism, the data is shuffled accordingly .
2. That's a good question. I think the default logic is to go round-robin
on the TMs as you suggested, but I'm not 100% sure. There are a couple of
exceptions and special cases, IIRC.
3. No, I would still use Flink's join operator to do the join. When you
read both files with the same split, you'd have a single source for both
input. You could do something like:

                /-- Filter "/source1" --\
Source -<                                  >-Join-->
                \-- Filter "/source2" --/

If all operators have the same parallelism and the source has the right
split properties configured, all data should stay local and the join would
work without partitioning the data.
You could go even further if the data in the files is sorted on the join
key. Then you could read in zig-zag fashion from both files and give sorted
split properties. In theory, the join would happen without a sort (haven't
tried this though).

4.a Yes that is true. FileInputFormat has a flag to prevent files from
being split up into multiple splits.
4.b You might be able to hack this with a custom InputSplitAssigner. The
SplitDataProperties partition methods have a partitioniner ID field. IIRC,
this is used to determine equal partitioning for joins.
However, as I said, you need to make sure that the files with the same keys
are read by the same subtask. You could do that with a custom
InputSplitAssigner.
My proposal to read both files with the same key in the same input split
(with a single source) tried to go around this issue by forcing the data of
both files in the same subtask.
4.c. The concept of a partition is a bit different in Flink and not bound
to InputSplits. All data arriving at a parallel instance of an operator is
considered to be in the same partition.
So both, FlatMap and MapPartition, call open() just once. In MapPartition
the mapPartition() method is also called once, while flatMap() is called
for each record.

Cheers, Fabian




2017-10-26 15:04 GMT+02:00 David Dreyfus <dddrey...@gmail.com>:

> Hi Fabian,
>
> Thank you for the great, detailed answers.
> 1. So, each parallel slice of the DAG is placed into one slot. The key to
> high utilization is many slices of the source data (or the various methods
> of repartitioning it). Yes?
> 2. In batch processing, are slots filled round-robin on task managers, or
> do
> I need to tune the number of slots to load the cluster evenly?
> 3. Are you suggesting that I perform the join in my custom data source?
> 4. Looking at this sample from
> org.apache.flink.optimizer.PropertyDataSourceTest
>
>   DataSource<Tuple2&lt;Long, String>> data =
>     env.readCsvFile("/some/path").types(Long.class, String.class);
>
>   data.getSplitDataProperties()
>     .splitsPartitionedBy(0);
>
> 4.a Does this code assume that one split == one file from /some/path? If
> readCsvFile splits each file, the guarantee that all keys in each part of
> the file share the same partition would be violated, right?
> 4.b Is there a way to mark a partition number so that sources that share
> partition numbers are read in parallel and joined? If I have 10,000 pairs,
> I
> want partition 1 read from the sources at the same time.
> 4.c Does a downstream flatmap function get an open() call for each new
> partition? Or, do I chain MapPartition directly to the datasource?
>
> Thank you,
> David
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to