I think I agree with Robert (unless I'm misunderstanding his point).

I think that the shell commands are going to be the most useful if it is
possible to take the elements in an input PCollection, construct a shell
command depending on those elements, and then execute it. I think doing so
in a fully general manner outside of a DoFn will be difficult. If instead
we made it easier to declare a DoFn as having requirements on the
environment (these programs must be available in the shell) and easier to
execute shell commands within a DoFn, I think that covers many more use
cases.

On Thu, Dec 8, 2016 at 12:23 PM Robert Bradshaw <[email protected]>
wrote:

> On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré <[email protected]>
> wrote:
> > By the way, just to elaborate a bit why I provided as an IO:
> >
> > 1. From an user experience perspective, I think we have to provide
> > convenient way to write pipeline. Any syntax simplifying this is
> valuable.
> > I think it's easier to write:
> >
> > pipeline.apply(ExecIO.read().withCommand("foo"))
> >
> > than:
> >
> > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());
>
> Slightly. Still, when I see
>
>     pipeline.apply(ExecIO.read().withCommand("foo"))
>
> I am surprised to get a PCollection with a single element...
>
> > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for
> > "connector": reading/writing from/to a data source. So, even without the
> IO
> > "wrapping" (by wrapping, I mean the Read and Write), I think Exec
> extension
> > should be in IO as it's a source/write of data.
>
> To clarify, if you wrote a DoFn that, say, did lookups against a MySQL
> database, you would consider this an IO? For me, IO denotes
> input/output, i.e. the roots and leaves of a pipeline.
>
> > Regards
> > JB
> >
> > On 12/07/2016 08:37 AM, Robert Bradshaw wrote:
> >>
> >> I don't mean to derail the tricky environment questions, but I'm not
> >> seeing why this is bundled as an IO rather than a plain DoFn (which
> >> can be applied to a PCollection of one or more commands, yielding
> >> their outputs). Especially for the case of a Read, which in this case
> >> is not splittable (initially or dynamically) and always produces a
> >> single element--feels much more like a Map to me.
> >>
> >> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
> >> <[email protected]> wrote:
> >>>
> >>> Ben - the issues of "things aren't hung, there is a shell command
> >>> running",
> >>> aren't they general to all DoFn's? i.e. I don't see why the runner
> would
> >>> need to know that a shell command is running, but not that, say, a
> heavy
> >>> monolithic computation is running. What's the benefit to the runner in
> >>> knowing that the DoFn contains a shell command?
> >>>
> >>> By saying "making sure that all shell commands finish", I suppose
> you're
> >>> referring to the possibility of leaks if the user initiates a shell
> >>> command
> >>> and forgets to wait for it? I think that should be solvable again
> without
> >>> Beam intervention, by making a utility class for running shell commands
> >>> which implements AutoCloseable, and document that you have to use it
> that
> >>> way.
> >>>
> >>> Ken - I think the question here is: are we ok with a situation where
> the
> >>> runner doesn't check or care whether the shell command can run, and the
> >>> user accepts this risk and studies what commands will be available on
> the
> >>> worker environment provided by the runner they use in production,
> before
> >>> productionizing a pipeline with those commands.
> >>>
> >>> Upon some thought I think it's ok. Of course, this carries an
> obligation
> >>> for runners to document their worker environment and its changes across
> >>> versions. Though for many runners such documentation may be trivial:
> >>> "whatever your YARN cluster has, the runner doesn't change it in any
> way"
> >>> and it may be good enough for users. And for other runners, like
> >>> Dataflow,
> >>> such documentation may also be trivial: "no guarantees whatsoever, only
> >>> what you stage in --filesToStage is available".
> >>>
> >>> I can also see Beam develop to a point where we'd want all runners to
> be
> >>> able to run your DoFn in a user-specified Docker container, and manage
> >>> those intelligently - but I think that's quite a while away and it
> >>> doesn't
> >>> have to block work on a utility for executing shell commands. Though
> it'd
> >>> be nice if the utility was forward-compatible with that future world.
> >>>
> >>> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <[email protected]>
> >>> wrote:
> >>>
> >>>> Hi Eugene,
> >>>>
> >>>> thanks for the extended questions.
> >>>>
> >>>> I think we have two levels of expectations here:
> >>>> - end-user responsibility
> >>>> - worker/runner responsibility
> >>>>
> >>>> 1/ From a end-user perspective, the end-user has to know that using a
> >>>> system command (via ExecIO) and more generally speaking anything which
> >>>> relay on worker resources (for instance a local filesystem directory
> >>>> available only on a worker) can fail if the expected resource is not
> >>>> present on all workers. So, basically, all workers should have the
> same
> >>>> topology. It's what I'm assuming for the PR.
> >>>> For example, I have my Spark cluster, using the same Mesos/Docker
> setup,
> >>>> then the user knows that all nodes in the cluster will have the same
> >>>> setup and so resources (it could be provided by DevOps for instance).
> >>>> On the other hand, running on Dataflow is different because I don't
> >>>> "control" the nodes (bootstrapping or resources), but in that case,
> the
> >>>> user knows it (he knows the runner he's using).
> >>>>
> >>>> 2/ As you said, we can expect that runner can deal with some
> >>>> requirements (expressed depending of the pipeline and the runner), and
> >>>> the runner can know the workers which provide capabilities matching
> >>>> those requirements.
> >>>> Then, the end user is not more responsible: the runner will try to
> >>>> define if the pipeline can be executed, and where a DoFn has to be run
> >>>> (on which worker).
> >>>>
> >>>> For me, it's two different levels where 2 is smarter but 1 can also
> make
> >>>> sense.
> >>>>
> >>>> WDYT ?
> >>>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:
> >>>>>
> >>>>> Hi JB,
> >>>>>
> >>>>> Thanks for bringing this to the mailing list. I also think that this
> is
> >>>>> useful in general (and that use cases for Beam are more than just
> >>>>> classic
> >>>>> bigdata), and that there are interesting questions here at different
> >>>>
> >>>> levels
> >>>>>
> >>>>> about how to do it right.
> >>>>>
> >>>>> I suggest to start with the highest-level question [and discuss the
> >>>>> particular API only after agreeing on this, possibly in a separate
> >>>>
> >>>> thread]:
> >>>>>
> >>>>> how to deal with the fact that Beam gives no guarantees about the
> >>>>> environment on workers, e.g. which commands are available, which
> shell
> >>>>> or
> >>>>> even OS is being used, etc. Particularly:
> >>>>>
> >>>>> - Obviously different runners will have a different environment, e.g.
> >>>>> Dataflow workers are not going to have Hadoop commands available
> >>>>> because
> >>>>> they are not running on a Hadoop cluster. So, pipelines and
> transforms
> >>>>> developed using this connector will be necessarily non-portable
> between
> >>>>> different runners. Maybe this is ok? But we need to give users a
> clear
> >>>>> expectation about this. How do we phrase this expectation and where
> do
> >>>>> we
> >>>>> put it in the docs?
> >>>>>
> >>>>> - I'm concerned that this puts additional compatibility requirements
> on
> >>>>> runners - it becomes necessary for a runner to document the
> environment
> >>>>
> >>>> of
> >>>>>
> >>>>> its workers (OS, shell, privileges, guaranteed-installed packages,
> >>>>> access
> >>>>> to other things on the host machine e.g. whether or not the worker
> runs
> >>>>
> >>>> in
> >>>>>
> >>>>> its own container, etc.) and to keep it stable - otherwise transforms
> >>>>> and
> >>>>> pipelines with this connector will be non-portable between runner
> >>>>
> >>>> versions
> >>>>>
> >>>>> either.
> >>>>>
> >>>>> Another way to deal with this is to give up and say "the environment
> on
> >>>>
> >>>> the
> >>>>>
> >>>>> workers is outside the scope of Beam; consult your runner's
> >>>>> documentation
> >>>>> or use your best judgment as to what the environment will be, and use
> >>>>
> >>>> this
> >>>>>
> >>>>> at your own risk".
> >>>>>
> >>>>> What do others think?
> >>>>>
> >>>>> On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <[email protected]
> >
> >>>>
> >>>> wrote:
> >>>>>
> >>>>>
> >>>>> Hi beamers,
> >>>>>
> >>>>> Today, Beam is mainly focused on data processing.
> >>>>> Since the beginning of the project, we are discussing about extending
> >>>>> the use cases coverage via DSLs and extensions (like for machine
> >>>>> learning), or via IO.
> >>>>>
> >>>>> Especially for the IO, we can see Beam use for data integration and
> >>>>> data
> >>>>> ingestion.
> >>>>>
> >>>>> In this area, I'm proposing a first IO: ExecIO:
> >>>>>
> >>>>> https://issues.apache.org/jira/browse/BEAM-1059
> >>>>> https://github.com/apache/incubator-beam/pull/1451
> >>>>>
> >>>>> Actually, this IO is mainly an ExecFn that executes system commands
> >>>>> (again, keep in mind we are discussing about data
> integration/ingestion
> >>>>> and not data processing).
> >>>>>
> >>>>> For convenience, this ExecFn is wrapped in Read and Write (as a
> regular
> >>>>
> >>>> IO).
> >>>>>
> >>>>>
> >>>>> Clearly, this IO/Fn depends of the worker where it runs. But it's
> under
> >>>>> the user responsibility.
> >>>>>
> >>>>> During the review, Eugene and I discussed about:
> >>>>> - is it an IO or just a fn ?
> >>>>> - is it OK to have worker specific IO ?
> >>>>>
> >>>>> IMHO, an IO makes lot of sense to me and it's very convenient for end
> >>>>> users. They can do something like:
> >>>>>
> >>>>> PCollection<String> output =
> >>>>> pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));
> >>>>>
> >>>>> The pipeline will execute myscript and the output pipeline will
> contain
> >>>>> command execution std out/err.
> >>>>>
> >>>>> On the other hand, they can do:
> >>>>>
> >>>>> pcollection.apply(ExecIO.write());
> >>>>>
> >>>>> where PCollection contains the commands to execute.
> >>>>>
> >>>>> Generally speaking, end users can call ExecFn wherever they want in
> the
> >>>>> pipeline steps:
> >>>>>
> >>>>> PCollection<String> output = pipeline.apply(ParDo.of(new
> >>>>
> >>>> ExecIO.ExecFn()));
> >>>>>
> >>>>>
> >>>>> The input collection contains the commands to execute, and the output
> >>>>> collection contains the commands execution result std out/err.
> >>>>>
> >>>>> Generally speaking, I'm preparing several IOs more on the data
> >>>>> integration/ingestion area than on "pure" classic big data
> processing.
> >>>>> I
> >>>>> think it would give a new "dimension" to Beam.
> >>>>>
> >>>>> Thoughts ?
> >>>>>
> >>>>> Regards
> >>>>> JB
> >>>>> --
> >>>>> Jean-Baptiste Onofré
> >>>>> [email protected]
> >>>>> http://blog.nanthrax.net
> >>>>> Talend - http://www.talend.com
> >>>>>
> >>>>
> >>>> --
> >>>> Jean-Baptiste Onofré
> >>>> [email protected]
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >>>>
> >
> > --
> > Jean-Baptiste Onofré
> > [email protected]
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>

Reply via email to