I think I agree with Robert (unless I'm misunderstanding his point). I think that the shell commands are going to be the most useful if it is possible to take the elements in an input PCollection, construct a shell command depending on those elements, and then execute it. I think doing so in a fully general manner outside of a DoFn will be difficult. If instead we made it easier to declare a DoFn as having requirements on the environment (these programs must be available in the shell) and easier to execute shell commands within a DoFn, I think that covers many more use cases.
On Thu, Dec 8, 2016 at 12:23 PM Robert Bradshaw <[email protected]> wrote: > On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré <[email protected]> > wrote: > > By the way, just to elaborate a bit why I provided as an IO: > > > > 1. From an user experience perspective, I think we have to provide > > convenient way to write pipeline. Any syntax simplifying this is > valuable. > > I think it's easier to write: > > > > pipeline.apply(ExecIO.read().withCommand("foo")) > > > > than: > > > > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn()); > > Slightly. Still, when I see > > pipeline.apply(ExecIO.read().withCommand("foo")) > > I am surprised to get a PCollection with a single element... > > > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for > > "connector": reading/writing from/to a data source. So, even without the > IO > > "wrapping" (by wrapping, I mean the Read and Write), I think Exec > extension > > should be in IO as it's a source/write of data. > > To clarify, if you wrote a DoFn that, say, did lookups against a MySQL > database, you would consider this an IO? For me, IO denotes > input/output, i.e. the roots and leaves of a pipeline. > > > Regards > > JB > > > > On 12/07/2016 08:37 AM, Robert Bradshaw wrote: > >> > >> I don't mean to derail the tricky environment questions, but I'm not > >> seeing why this is bundled as an IO rather than a plain DoFn (which > >> can be applied to a PCollection of one or more commands, yielding > >> their outputs). Especially for the case of a Read, which in this case > >> is not splittable (initially or dynamically) and always produces a > >> single element--feels much more like a Map to me. > >> > >> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov > >> <[email protected]> wrote: > >>> > >>> Ben - the issues of "things aren't hung, there is a shell command > >>> running", > >>> aren't they general to all DoFn's? i.e. I don't see why the runner > would > >>> need to know that a shell command is running, but not that, say, a > heavy > >>> monolithic computation is running. What's the benefit to the runner in > >>> knowing that the DoFn contains a shell command? > >>> > >>> By saying "making sure that all shell commands finish", I suppose > you're > >>> referring to the possibility of leaks if the user initiates a shell > >>> command > >>> and forgets to wait for it? I think that should be solvable again > without > >>> Beam intervention, by making a utility class for running shell commands > >>> which implements AutoCloseable, and document that you have to use it > that > >>> way. > >>> > >>> Ken - I think the question here is: are we ok with a situation where > the > >>> runner doesn't check or care whether the shell command can run, and the > >>> user accepts this risk and studies what commands will be available on > the > >>> worker environment provided by the runner they use in production, > before > >>> productionizing a pipeline with those commands. > >>> > >>> Upon some thought I think it's ok. Of course, this carries an > obligation > >>> for runners to document their worker environment and its changes across > >>> versions. Though for many runners such documentation may be trivial: > >>> "whatever your YARN cluster has, the runner doesn't change it in any > way" > >>> and it may be good enough for users. And for other runners, like > >>> Dataflow, > >>> such documentation may also be trivial: "no guarantees whatsoever, only > >>> what you stage in --filesToStage is available". > >>> > >>> I can also see Beam develop to a point where we'd want all runners to > be > >>> able to run your DoFn in a user-specified Docker container, and manage > >>> those intelligently - but I think that's quite a while away and it > >>> doesn't > >>> have to block work on a utility for executing shell commands. Though > it'd > >>> be nice if the utility was forward-compatible with that future world. > >>> > >>> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <[email protected]> > >>> wrote: > >>> > >>>> Hi Eugene, > >>>> > >>>> thanks for the extended questions. > >>>> > >>>> I think we have two levels of expectations here: > >>>> - end-user responsibility > >>>> - worker/runner responsibility > >>>> > >>>> 1/ From a end-user perspective, the end-user has to know that using a > >>>> system command (via ExecIO) and more generally speaking anything which > >>>> relay on worker resources (for instance a local filesystem directory > >>>> available only on a worker) can fail if the expected resource is not > >>>> present on all workers. So, basically, all workers should have the > same > >>>> topology. It's what I'm assuming for the PR. > >>>> For example, I have my Spark cluster, using the same Mesos/Docker > setup, > >>>> then the user knows that all nodes in the cluster will have the same > >>>> setup and so resources (it could be provided by DevOps for instance). > >>>> On the other hand, running on Dataflow is different because I don't > >>>> "control" the nodes (bootstrapping or resources), but in that case, > the > >>>> user knows it (he knows the runner he's using). > >>>> > >>>> 2/ As you said, we can expect that runner can deal with some > >>>> requirements (expressed depending of the pipeline and the runner), and > >>>> the runner can know the workers which provide capabilities matching > >>>> those requirements. > >>>> Then, the end user is not more responsible: the runner will try to > >>>> define if the pipeline can be executed, and where a DoFn has to be run > >>>> (on which worker). > >>>> > >>>> For me, it's two different levels where 2 is smarter but 1 can also > make > >>>> sense. > >>>> > >>>> WDYT ? > >>>> > >>>> Regards > >>>> JB > >>>> > >>>> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote: > >>>>> > >>>>> Hi JB, > >>>>> > >>>>> Thanks for bringing this to the mailing list. I also think that this > is > >>>>> useful in general (and that use cases for Beam are more than just > >>>>> classic > >>>>> bigdata), and that there are interesting questions here at different > >>>> > >>>> levels > >>>>> > >>>>> about how to do it right. > >>>>> > >>>>> I suggest to start with the highest-level question [and discuss the > >>>>> particular API only after agreeing on this, possibly in a separate > >>>> > >>>> thread]: > >>>>> > >>>>> how to deal with the fact that Beam gives no guarantees about the > >>>>> environment on workers, e.g. which commands are available, which > shell > >>>>> or > >>>>> even OS is being used, etc. Particularly: > >>>>> > >>>>> - Obviously different runners will have a different environment, e.g. > >>>>> Dataflow workers are not going to have Hadoop commands available > >>>>> because > >>>>> they are not running on a Hadoop cluster. So, pipelines and > transforms > >>>>> developed using this connector will be necessarily non-portable > between > >>>>> different runners. Maybe this is ok? But we need to give users a > clear > >>>>> expectation about this. How do we phrase this expectation and where > do > >>>>> we > >>>>> put it in the docs? > >>>>> > >>>>> - I'm concerned that this puts additional compatibility requirements > on > >>>>> runners - it becomes necessary for a runner to document the > environment > >>>> > >>>> of > >>>>> > >>>>> its workers (OS, shell, privileges, guaranteed-installed packages, > >>>>> access > >>>>> to other things on the host machine e.g. whether or not the worker > runs > >>>> > >>>> in > >>>>> > >>>>> its own container, etc.) and to keep it stable - otherwise transforms > >>>>> and > >>>>> pipelines with this connector will be non-portable between runner > >>>> > >>>> versions > >>>>> > >>>>> either. > >>>>> > >>>>> Another way to deal with this is to give up and say "the environment > on > >>>> > >>>> the > >>>>> > >>>>> workers is outside the scope of Beam; consult your runner's > >>>>> documentation > >>>>> or use your best judgment as to what the environment will be, and use > >>>> > >>>> this > >>>>> > >>>>> at your own risk". > >>>>> > >>>>> What do others think? > >>>>> > >>>>> On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <[email protected] > > > >>>> > >>>> wrote: > >>>>> > >>>>> > >>>>> Hi beamers, > >>>>> > >>>>> Today, Beam is mainly focused on data processing. > >>>>> Since the beginning of the project, we are discussing about extending > >>>>> the use cases coverage via DSLs and extensions (like for machine > >>>>> learning), or via IO. > >>>>> > >>>>> Especially for the IO, we can see Beam use for data integration and > >>>>> data > >>>>> ingestion. > >>>>> > >>>>> In this area, I'm proposing a first IO: ExecIO: > >>>>> > >>>>> https://issues.apache.org/jira/browse/BEAM-1059 > >>>>> https://github.com/apache/incubator-beam/pull/1451 > >>>>> > >>>>> Actually, this IO is mainly an ExecFn that executes system commands > >>>>> (again, keep in mind we are discussing about data > integration/ingestion > >>>>> and not data processing). > >>>>> > >>>>> For convenience, this ExecFn is wrapped in Read and Write (as a > regular > >>>> > >>>> IO). > >>>>> > >>>>> > >>>>> Clearly, this IO/Fn depends of the worker where it runs. But it's > under > >>>>> the user responsibility. > >>>>> > >>>>> During the review, Eugene and I discussed about: > >>>>> - is it an IO or just a fn ? > >>>>> - is it OK to have worker specific IO ? > >>>>> > >>>>> IMHO, an IO makes lot of sense to me and it's very convenient for end > >>>>> users. They can do something like: > >>>>> > >>>>> PCollection<String> output = > >>>>> pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); > >>>>> > >>>>> The pipeline will execute myscript and the output pipeline will > contain > >>>>> command execution std out/err. > >>>>> > >>>>> On the other hand, they can do: > >>>>> > >>>>> pcollection.apply(ExecIO.write()); > >>>>> > >>>>> where PCollection contains the commands to execute. > >>>>> > >>>>> Generally speaking, end users can call ExecFn wherever they want in > the > >>>>> pipeline steps: > >>>>> > >>>>> PCollection<String> output = pipeline.apply(ParDo.of(new > >>>> > >>>> ExecIO.ExecFn())); > >>>>> > >>>>> > >>>>> The input collection contains the commands to execute, and the output > >>>>> collection contains the commands execution result std out/err. > >>>>> > >>>>> Generally speaking, I'm preparing several IOs more on the data > >>>>> integration/ingestion area than on "pure" classic big data > processing. > >>>>> I > >>>>> think it would give a new "dimension" to Beam. > >>>>> > >>>>> Thoughts ? > >>>>> > >>>>> Regards > >>>>> JB > >>>>> -- > >>>>> Jean-Baptiste Onofré > >>>>> [email protected] > >>>>> http://blog.nanthrax.net > >>>>> Talend - http://www.talend.com > >>>>> > >>>> > >>>> -- > >>>> Jean-Baptiste Onofré > >>>> [email protected] > >>>> http://blog.nanthrax.net > >>>> Talend - http://www.talend.com > >>>> > > > > -- > > Jean-Baptiste Onofré > > [email protected] > > http://blog.nanthrax.net > > Talend - http://www.talend.com >
