Hi guys,

I understand your point.

The Exec "IO" can already take input commands from a PCollection, but the user has to prepare the commands. I will improve the ExecFn as you said: be able to construct the shell commands using elements in the PCollection (using one element as command, the others as arguments).

I agree with your statement about DoFn: a DoFn in a "middle" of a pipeline is not an IO. An IO acts as endpoints in a pipeline: starting endpoint for Read, ending endpoint for Write.

Point is that a DoFn can be a connector (for instance a MySQL database lookup as you said) but it can be wrapped as an IO.

If I compare with Apache Camel, a pipeline (aka route) starts with an unique (it's what we name a consumer endpoint on a Camel component). A producer endpoint can end a route or be used in any middle step. It provides a convenient way to extend the processing/routing logic.
It's like a DoFn.

Regards
JB

On 12/08/2016 09:37 PM, Ben Chambers wrote:
I think I agree with Robert (unless I'm misunderstanding his point).

I think that the shell commands are going to be the most useful if it is
possible to take the elements in an input PCollection, construct a shell
command depending on those elements, and then execute it. I think doing so
in a fully general manner outside of a DoFn will be difficult. If instead
we made it easier to declare a DoFn as having requirements on the
environment (these programs must be available in the shell) and easier to
execute shell commands within a DoFn, I think that covers many more use
cases.

On Thu, Dec 8, 2016 at 12:23 PM Robert Bradshaw <rober...@google.com.invalid>
wrote:

On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:
By the way, just to elaborate a bit why I provided as an IO:

1. From an user experience perspective, I think we have to provide
convenient way to write pipeline. Any syntax simplifying this is
valuable.
I think it's easier to write:

pipeline.apply(ExecIO.read().withCommand("foo"))

than:

pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());

Slightly. Still, when I see

    pipeline.apply(ExecIO.read().withCommand("foo"))

I am surprised to get a PCollection with a single element...

2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for
"connector": reading/writing from/to a data source. So, even without the
IO
"wrapping" (by wrapping, I mean the Read and Write), I think Exec
extension
should be in IO as it's a source/write of data.

To clarify, if you wrote a DoFn that, say, did lookups against a MySQL
database, you would consider this an IO? For me, IO denotes
input/output, i.e. the roots and leaves of a pipeline.

Regards
JB

On 12/07/2016 08:37 AM, Robert Bradshaw wrote:

I don't mean to derail the tricky environment questions, but I'm not
seeing why this is bundled as an IO rather than a plain DoFn (which
can be applied to a PCollection of one or more commands, yielding
their outputs). Especially for the case of a Read, which in this case
is not splittable (initially or dynamically) and always produces a
single element--feels much more like a Map to me.

On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:

Ben - the issues of "things aren't hung, there is a shell command
running",
aren't they general to all DoFn's? i.e. I don't see why the runner
would
need to know that a shell command is running, but not that, say, a
heavy
monolithic computation is running. What's the benefit to the runner in
knowing that the DoFn contains a shell command?

By saying "making sure that all shell commands finish", I suppose
you're
referring to the possibility of leaks if the user initiates a shell
command
and forgets to wait for it? I think that should be solvable again
without
Beam intervention, by making a utility class for running shell commands
which implements AutoCloseable, and document that you have to use it
that
way.

Ken - I think the question here is: are we ok with a situation where
the
runner doesn't check or care whether the shell command can run, and the
user accepts this risk and studies what commands will be available on
the
worker environment provided by the runner they use in production,
before
productionizing a pipeline with those commands.

Upon some thought I think it's ok. Of course, this carries an
obligation
for runners to document their worker environment and its changes across
versions. Though for many runners such documentation may be trivial:
"whatever your YARN cluster has, the runner doesn't change it in any
way"
and it may be good enough for users. And for other runners, like
Dataflow,
such documentation may also be trivial: "no guarantees whatsoever, only
what you stage in --filesToStage is available".

I can also see Beam develop to a point where we'd want all runners to
be
able to run your DoFn in a user-specified Docker container, and manage
those intelligently - but I think that's quite a while away and it
doesn't
have to block work on a utility for executing shell commands. Though
it'd
be nice if the utility was forward-compatible with that future world.

On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Hi Eugene,

thanks for the extended questions.

I think we have two levels of expectations here:
- end-user responsibility
- worker/runner responsibility

1/ From a end-user perspective, the end-user has to know that using a
system command (via ExecIO) and more generally speaking anything which
relay on worker resources (for instance a local filesystem directory
available only on a worker) can fail if the expected resource is not
present on all workers. So, basically, all workers should have the
same
topology. It's what I'm assuming for the PR.
For example, I have my Spark cluster, using the same Mesos/Docker
setup,
then the user knows that all nodes in the cluster will have the same
setup and so resources (it could be provided by DevOps for instance).
On the other hand, running on Dataflow is different because I don't
"control" the nodes (bootstrapping or resources), but in that case,
the
user knows it (he knows the runner he's using).

2/ As you said, we can expect that runner can deal with some
requirements (expressed depending of the pipeline and the runner), and
the runner can know the workers which provide capabilities matching
those requirements.
Then, the end user is not more responsible: the runner will try to
define if the pipeline can be executed, and where a DoFn has to be run
(on which worker).

For me, it's two different levels where 2 is smarter but 1 can also
make
sense.

WDYT ?

Regards
JB

On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:

Hi JB,

Thanks for bringing this to the mailing list. I also think that this
is
useful in general (and that use cases for Beam are more than just
classic
bigdata), and that there are interesting questions here at different

levels

about how to do it right.

I suggest to start with the highest-level question [and discuss the
particular API only after agreeing on this, possibly in a separate

thread]:

how to deal with the fact that Beam gives no guarantees about the
environment on workers, e.g. which commands are available, which
shell
or
even OS is being used, etc. Particularly:

- Obviously different runners will have a different environment, e.g.
Dataflow workers are not going to have Hadoop commands available
because
they are not running on a Hadoop cluster. So, pipelines and
transforms
developed using this connector will be necessarily non-portable
between
different runners. Maybe this is ok? But we need to give users a
clear
expectation about this. How do we phrase this expectation and where
do
we
put it in the docs?

- I'm concerned that this puts additional compatibility requirements
on
runners - it becomes necessary for a runner to document the
environment

of

its workers (OS, shell, privileges, guaranteed-installed packages,
access
to other things on the host machine e.g. whether or not the worker
runs

in

its own container, etc.) and to keep it stable - otherwise transforms
and
pipelines with this connector will be non-portable between runner

versions

either.

Another way to deal with this is to give up and say "the environment
on

the

workers is outside the scope of Beam; consult your runner's
documentation
or use your best judgment as to what the environment will be, and use

this

at your own risk".

What do others think?

On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net


wrote:


Hi beamers,

Today, Beam is mainly focused on data processing.
Since the beginning of the project, we are discussing about extending
the use cases coverage via DSLs and extensions (like for machine
learning), or via IO.

Especially for the IO, we can see Beam use for data integration and
data
ingestion.

In this area, I'm proposing a first IO: ExecIO:

https://issues.apache.org/jira/browse/BEAM-1059
https://github.com/apache/incubator-beam/pull/1451

Actually, this IO is mainly an ExecFn that executes system commands
(again, keep in mind we are discussing about data
integration/ingestion
and not data processing).

For convenience, this ExecFn is wrapped in Read and Write (as a
regular

IO).


Clearly, this IO/Fn depends of the worker where it runs. But it's
under
the user responsibility.

During the review, Eugene and I discussed about:
- is it an IO or just a fn ?
- is it OK to have worker specific IO ?

IMHO, an IO makes lot of sense to me and it's very convenient for end
users. They can do something like:

PCollection<String> output =
pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));

The pipeline will execute myscript and the output pipeline will
contain
command execution std out/err.

On the other hand, they can do:

pcollection.apply(ExecIO.write());

where PCollection contains the commands to execute.

Generally speaking, end users can call ExecFn wherever they want in
the
pipeline steps:

PCollection<String> output = pipeline.apply(ParDo.of(new

ExecIO.ExecFn()));


The input collection contains the commands to execute, and the output
collection contains the commands execution result std out/err.

Generally speaking, I'm preparing several IOs more on the data
integration/ingestion area than on "pure" classic big data
processing.
I
think it would give a new "dimension" to Beam.

Thoughts ?

Regards
JB
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to