Hi Robert,

this is indeed a bit tricky to do. The problem is mostly with the
generation of the input splits, setup of Flink, and the scheduling of tasks.

1) you have to ensure that on each worker at least one DataSource task is
scheduled. The easiest way to do this is to have a bare metal setup (no
YARN) and a single TaskManager per worker. Each TM should have the same
number of slots and the DataSource should have a parallelism of #TMs *
slots. This will ensure that the same number of DataSource tasks is started
on each worker.

2) you need to tweak the input split generation. Flink's FileInputFormat
assume that it can access all files to be read via a distributed file
system. Your InputFormat should have a parameter for the list of
taskmanager (hostnames, IP addresses) and the number of slots per TM. The
InputFormat.createInputSplits() should create one input split for each
parallel task. Each split should have (hostname, local index)

3) you need to tweak the input split assignment. You need to provide a
custom input split provider that ensures that splits are only assigned to
the correct task manager. Otherwise it might happen that a TaskManager
processes the split of another TM and some data is read twice while other
data is not read at all.

4) once a split is assigned to a task the InputFormat.open() method is
called. Based on the local index, the task should decide which files (or
parts of files) it needs to read. This decision must be deterministic (only
depend on local index) and ensure that all data (files / parts of files)
are read exactly once (you'll need the number of slots per host for that).

You see, this is not trivial. Moreover, such a setup is not flexible, quite
fragile, and not fault tolerant.
Since you need to read local files are not available anywhere else, your
job will fail if a TM goes down.

If possible, I would recommend to move the data into a distributed file
system.

Best,
Fabian

2016-12-27 13:04 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:

> Hi everyone,
>
> I'm using Flink and/or Hadoop on my cluster, and I'm having them generate
> log data in each worker node's /local folder (regular mount point). Now I
> would like to process these files using Flink, but I'm not quite sure how I
> could tell Flink to use each worker node's /local folder as input path,
> because I'd expect Flink to look in the /local folder of the submitting
> node only. Do I have to put these files into HDFS or is there a way to tell
> Flink the file:///local file URI refers to worker-local data? Thanks in
> advance for any hints and best
>
> Robert
>
> --
> My GPG Key ID: 336E2680
>

Reply via email to