Hi Eugene,

thanks for the extended questions.

I think we have two levels of expectations here:
- end-user responsibility
- worker/runner responsibility

1/ From a end-user perspective, the end-user has to know that using a system command (via ExecIO) and more generally speaking anything which relay on worker resources (for instance a local filesystem directory available only on a worker) can fail if the expected resource is not present on all workers. So, basically, all workers should have the same topology. It's what I'm assuming for the PR. For example, I have my Spark cluster, using the same Mesos/Docker setup, then the user knows that all nodes in the cluster will have the same setup and so resources (it could be provided by DevOps for instance). On the other hand, running on Dataflow is different because I don't "control" the nodes (bootstrapping or resources), but in that case, the user knows it (he knows the runner he's using).

2/ As you said, we can expect that runner can deal with some requirements (expressed depending of the pipeline and the runner), and the runner can know the workers which provide capabilities matching those requirements. Then, the end user is not more responsible: the runner will try to define if the pipeline can be executed, and where a DoFn has to be run (on which worker).

For me, it's two different levels where 2 is smarter but 1 can also make sense.

WDYT ?

Regards
JB

On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:
Hi JB,

Thanks for bringing this to the mailing list. I also think that this is
useful in general (and that use cases for Beam are more than just classic
bigdata), and that there are interesting questions here at different levels
about how to do it right.

I suggest to start with the highest-level question [and discuss the
particular API only after agreeing on this, possibly in a separate thread]:
how to deal with the fact that Beam gives no guarantees about the
environment on workers, e.g. which commands are available, which shell or
even OS is being used, etc. Particularly:

- Obviously different runners will have a different environment, e.g.
Dataflow workers are not going to have Hadoop commands available because
they are not running on a Hadoop cluster. So, pipelines and transforms
developed using this connector will be necessarily non-portable between
different runners. Maybe this is ok? But we need to give users a clear
expectation about this. How do we phrase this expectation and where do we
put it in the docs?

- I'm concerned that this puts additional compatibility requirements on
runners - it becomes necessary for a runner to document the environment of
its workers (OS, shell, privileges, guaranteed-installed packages, access
to other things on the host machine e.g. whether or not the worker runs in
its own container, etc.) and to keep it stable - otherwise transforms and
pipelines with this connector will be non-portable between runner versions
either.

Another way to deal with this is to give up and say "the environment on the
workers is outside the scope of Beam; consult your runner's documentation
or use your best judgment as to what the environment will be, and use this
at your own risk".

What do others think?

On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

Hi beamers,

Today, Beam is mainly focused on data processing.
Since the beginning of the project, we are discussing about extending
the use cases coverage via DSLs and extensions (like for machine
learning), or via IO.

Especially for the IO, we can see Beam use for data integration and data
ingestion.

In this area, I'm proposing a first IO: ExecIO:

https://issues.apache.org/jira/browse/BEAM-1059
https://github.com/apache/incubator-beam/pull/1451

Actually, this IO is mainly an ExecFn that executes system commands
(again, keep in mind we are discussing about data integration/ingestion
and not data processing).

For convenience, this ExecFn is wrapped in Read and Write (as a regular IO).

Clearly, this IO/Fn depends of the worker where it runs. But it's under
the user responsibility.

During the review, Eugene and I discussed about:
- is it an IO or just a fn ?
- is it OK to have worker specific IO ?

IMHO, an IO makes lot of sense to me and it's very convenient for end
users. They can do something like:

PCollection<String> output =
pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));

The pipeline will execute myscript and the output pipeline will contain
command execution std out/err.

On the other hand, they can do:

pcollection.apply(ExecIO.write());

where PCollection contains the commands to execute.

Generally speaking, end users can call ExecFn wherever they want in the
pipeline steps:

PCollection<String> output = pipeline.apply(ParDo.of(new ExecIO.ExecFn()));

The input collection contains the commands to execute, and the output
collection contains the commands execution result std out/err.

Generally speaking, I'm preparing several IOs more on the data
integration/ingestion area than on "pure" classic big data processing. I
think it would give a new "dimension" to Beam.

Thoughts ?

Regards
JB
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to