Branched off into a separate thread.

How about ShellCommands.execute().withCommand("foo")? This is what it is -
it executes shell commands :)

Say, if I want to just execute a command for the sake of its side effect,
but I'm not interested in its output - it would feel odd to describe that
as either "reading" from the command or "writing" to it. Likewise, when I
execute commands in bash, I'm not thinking of it as reading or writing to
them.

Though, there are various modes of interaction with shell commands; some of
them could be called "reading" or "writing" I guess - or both!
- The command itself can be specified at pipeline construction time, or
fully dynamic (elements of a PCollection are themselves commands), or be
constructed from a fixed command and a variable set of arguments coming
from the PCollection (one-by-one or xargs-style, cramming as many arguments
as fit into the command line limit).
- We may also be writing elements of the PCollection to standard input of
the command - one-by-one, or in arbitrarily sized batches.
- We may be reading the command's stdout, its stderr, and its error code.

I think these options call for a more flexible naming and set of APIs than
read and write. And more flexible than a single DoFn, too (which is
something I hadn't thought of before - this connector definitely has room
for doing some interesting things).

On Wed, Dec 7, 2016 at 1:32 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

> By the way, just to elaborate a bit why I provided as an IO:
>
> 1. From an user experience perspective, I think we have to provide
> convenient way to write pipeline. Any syntax simplifying this is valuable.
> I think it's easier to write:
>
> pipeline.apply(ExecIO.read().withCommand("foo"))
>
> than:
>
> pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());
>
> 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for
> "connector": reading/writing from/to a data source. So, even without the
> IO "wrapping" (by wrapping, I mean the Read and Write), I think Exec
> extension should be in IO as it's a source/write of data.
>
> Regards
> JB
>
> On 12/07/2016 08:37 AM, Robert Bradshaw wrote:
> > I don't mean to derail the tricky environment questions, but I'm not
> > seeing why this is bundled as an IO rather than a plain DoFn (which
> > can be applied to a PCollection of one or more commands, yielding
> > their outputs). Especially for the case of a Read, which in this case
> > is not splittable (initially or dynamically) and always produces a
> > single element--feels much more like a Map to me.
> >
> > On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
> > <kirpic...@google.com.invalid> wrote:
> >> Ben - the issues of "things aren't hung, there is a shell command
> running",
> >> aren't they general to all DoFn's? i.e. I don't see why the runner would
> >> need to know that a shell command is running, but not that, say, a heavy
> >> monolithic computation is running. What's the benefit to the runner in
> >> knowing that the DoFn contains a shell command?
> >>
> >> By saying "making sure that all shell commands finish", I suppose you're
> >> referring to the possibility of leaks if the user initiates a shell
> command
> >> and forgets to wait for it? I think that should be solvable again
> without
> >> Beam intervention, by making a utility class for running shell commands
> >> which implements AutoCloseable, and document that you have to use it
> that
> >> way.
> >>
> >> Ken - I think the question here is: are we ok with a situation where the
> >> runner doesn't check or care whether the shell command can run, and the
> >> user accepts this risk and studies what commands will be available on
> the
> >> worker environment provided by the runner they use in production, before
> >> productionizing a pipeline with those commands.
> >>
> >> Upon some thought I think it's ok. Of course, this carries an obligation
> >> for runners to document their worker environment and its changes across
> >> versions. Though for many runners such documentation may be trivial:
> >> "whatever your YARN cluster has, the runner doesn't change it in any
> way"
> >> and it may be good enough for users. And for other runners, like
> Dataflow,
> >> such documentation may also be trivial: "no guarantees whatsoever, only
> >> what you stage in --filesToStage is available".
> >>
> >> I can also see Beam develop to a point where we'd want all runners to be
> >> able to run your DoFn in a user-specified Docker container, and manage
> >> those intelligently - but I think that's quite a while away and it
> doesn't
> >> have to block work on a utility for executing shell commands. Though
> it'd
> >> be nice if the utility was forward-compatible with that future world.
> >>
> >> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >>
> >>> Hi Eugene,
> >>>
> >>> thanks for the extended questions.
> >>>
> >>> I think we have two levels of expectations here:
> >>> - end-user responsibility
> >>> - worker/runner responsibility
> >>>
> >>> 1/ From a end-user perspective, the end-user has to know that using a
> >>> system command (via ExecIO) and more generally speaking anything which
> >>> relay on worker resources (for instance a local filesystem directory
> >>> available only on a worker) can fail if the expected resource is not
> >>> present on all workers. So, basically, all workers should have the same
> >>> topology. It's what I'm assuming for the PR.
> >>> For example, I have my Spark cluster, using the same Mesos/Docker
> setup,
> >>> then the user knows that all nodes in the cluster will have the same
> >>> setup and so resources (it could be provided by DevOps for instance).
> >>> On the other hand, running on Dataflow is different because I don't
> >>> "control" the nodes (bootstrapping or resources), but in that case, the
> >>> user knows it (he knows the runner he's using).
> >>>
> >>> 2/ As you said, we can expect that runner can deal with some
> >>> requirements (expressed depending of the pipeline and the runner), and
> >>> the runner can know the workers which provide capabilities matching
> >>> those requirements.
> >>> Then, the end user is not more responsible: the runner will try to
> >>> define if the pipeline can be executed, and where a DoFn has to be run
> >>> (on which worker).
> >>>
> >>> For me, it's two different levels where 2 is smarter but 1 can also
> make
> >>> sense.
> >>>
> >>> WDYT ?
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:
> >>>> Hi JB,
> >>>>
> >>>> Thanks for bringing this to the mailing list. I also think that this
> is
> >>>> useful in general (and that use cases for Beam are more than just
> classic
> >>>> bigdata), and that there are interesting questions here at different
> >>> levels
> >>>> about how to do it right.
> >>>>
> >>>> I suggest to start with the highest-level question [and discuss the
> >>>> particular API only after agreeing on this, possibly in a separate
> >>> thread]:
> >>>> how to deal with the fact that Beam gives no guarantees about the
> >>>> environment on workers, e.g. which commands are available, which
> shell or
> >>>> even OS is being used, etc. Particularly:
> >>>>
> >>>> - Obviously different runners will have a different environment, e.g.
> >>>> Dataflow workers are not going to have Hadoop commands available
> because
> >>>> they are not running on a Hadoop cluster. So, pipelines and transforms
> >>>> developed using this connector will be necessarily non-portable
> between
> >>>> different runners. Maybe this is ok? But we need to give users a clear
> >>>> expectation about this. How do we phrase this expectation and where
> do we
> >>>> put it in the docs?
> >>>>
> >>>> - I'm concerned that this puts additional compatibility requirements
> on
> >>>> runners - it becomes necessary for a runner to document the
> environment
> >>> of
> >>>> its workers (OS, shell, privileges, guaranteed-installed packages,
> access
> >>>> to other things on the host machine e.g. whether or not the worker
> runs
> >>> in
> >>>> its own container, etc.) and to keep it stable - otherwise transforms
> and
> >>>> pipelines with this connector will be non-portable between runner
> >>> versions
> >>>> either.
> >>>>
> >>>> Another way to deal with this is to give up and say "the environment
> on
> >>> the
> >>>> workers is outside the scope of Beam; consult your runner's
> documentation
> >>>> or use your best judgment as to what the environment will be, and use
> >>> this
> >>>> at your own risk".
> >>>>
> >>>> What do others think?
> >>>>
> >>>> On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> >>> wrote:
> >>>>
> >>>> Hi beamers,
> >>>>
> >>>> Today, Beam is mainly focused on data processing.
> >>>> Since the beginning of the project, we are discussing about extending
> >>>> the use cases coverage via DSLs and extensions (like for machine
> >>>> learning), or via IO.
> >>>>
> >>>> Especially for the IO, we can see Beam use for data integration and
> data
> >>>> ingestion.
> >>>>
> >>>> In this area, I'm proposing a first IO: ExecIO:
> >>>>
> >>>> https://issues.apache.org/jira/browse/BEAM-1059
> >>>> https://github.com/apache/incubator-beam/pull/1451
> >>>>
> >>>> Actually, this IO is mainly an ExecFn that executes system commands
> >>>> (again, keep in mind we are discussing about data
> integration/ingestion
> >>>> and not data processing).
> >>>>
> >>>> For convenience, this ExecFn is wrapped in Read and Write (as a
> regular
> >>> IO).
> >>>>
> >>>> Clearly, this IO/Fn depends of the worker where it runs. But it's
> under
> >>>> the user responsibility.
> >>>>
> >>>> During the review, Eugene and I discussed about:
> >>>> - is it an IO or just a fn ?
> >>>> - is it OK to have worker specific IO ?
> >>>>
> >>>> IMHO, an IO makes lot of sense to me and it's very convenient for end
> >>>> users. They can do something like:
> >>>>
> >>>> PCollection<String> output =
> >>>> pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));
> >>>>
> >>>> The pipeline will execute myscript and the output pipeline will
> contain
> >>>> command execution std out/err.
> >>>>
> >>>> On the other hand, they can do:
> >>>>
> >>>> pcollection.apply(ExecIO.write());
> >>>>
> >>>> where PCollection contains the commands to execute.
> >>>>
> >>>> Generally speaking, end users can call ExecFn wherever they want in
> the
> >>>> pipeline steps:
> >>>>
> >>>> PCollection<String> output = pipeline.apply(ParDo.of(new
> >>> ExecIO.ExecFn()));
> >>>>
> >>>> The input collection contains the commands to execute, and the output
> >>>> collection contains the commands execution result std out/err.
> >>>>
> >>>> Generally speaking, I'm preparing several IOs more on the data
> >>>> integration/ingestion area than on "pure" classic big data
> processing. I
> >>>> think it would give a new "dimension" to Beam.
> >>>>
> >>>> Thoughts ?
> >>>>
> >>>> Regards
> >>>> JB
> >>>> --
> >>>> Jean-Baptiste Onofré
> >>>> jbono...@apache.org
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >>>>
> >>>
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbono...@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to