On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > By the way, just to elaborate a bit why I provided as an IO: > > 1. From an user experience perspective, I think we have to provide > convenient way to write pipeline. Any syntax simplifying this is valuable. > I think it's easier to write: > > pipeline.apply(ExecIO.read().withCommand("foo")) > > than: > > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());
Slightly. Still, when I see pipeline.apply(ExecIO.read().withCommand("foo")) I am surprised to get a PCollection with a single element... > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for > "connector": reading/writing from/to a data source. So, even without the IO > "wrapping" (by wrapping, I mean the Read and Write), I think Exec extension > should be in IO as it's a source/write of data. To clarify, if you wrote a DoFn that, say, did lookups against a MySQL database, you would consider this an IO? For me, IO denotes input/output, i.e. the roots and leaves of a pipeline. > Regards > JB > > On 12/07/2016 08:37 AM, Robert Bradshaw wrote: >> >> I don't mean to derail the tricky environment questions, but I'm not >> seeing why this is bundled as an IO rather than a plain DoFn (which >> can be applied to a PCollection of one or more commands, yielding >> their outputs). Especially for the case of a Read, which in this case >> is not splittable (initially or dynamically) and always produces a >> single element--feels much more like a Map to me. >> >> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov >> <kirpic...@google.com.invalid> wrote: >>> >>> Ben - the issues of "things aren't hung, there is a shell command >>> running", >>> aren't they general to all DoFn's? i.e. I don't see why the runner would >>> need to know that a shell command is running, but not that, say, a heavy >>> monolithic computation is running. What's the benefit to the runner in >>> knowing that the DoFn contains a shell command? >>> >>> By saying "making sure that all shell commands finish", I suppose you're >>> referring to the possibility of leaks if the user initiates a shell >>> command >>> and forgets to wait for it? I think that should be solvable again without >>> Beam intervention, by making a utility class for running shell commands >>> which implements AutoCloseable, and document that you have to use it that >>> way. >>> >>> Ken - I think the question here is: are we ok with a situation where the >>> runner doesn't check or care whether the shell command can run, and the >>> user accepts this risk and studies what commands will be available on the >>> worker environment provided by the runner they use in production, before >>> productionizing a pipeline with those commands. >>> >>> Upon some thought I think it's ok. Of course, this carries an obligation >>> for runners to document their worker environment and its changes across >>> versions. Though for many runners such documentation may be trivial: >>> "whatever your YARN cluster has, the runner doesn't change it in any way" >>> and it may be good enough for users. And for other runners, like >>> Dataflow, >>> such documentation may also be trivial: "no guarantees whatsoever, only >>> what you stage in --filesToStage is available". >>> >>> I can also see Beam develop to a point where we'd want all runners to be >>> able to run your DoFn in a user-specified Docker container, and manage >>> those intelligently - but I think that's quite a while away and it >>> doesn't >>> have to block work on a utility for executing shell commands. Though it'd >>> be nice if the utility was forward-compatible with that future world. >>> >>> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <j...@nanthrax.net> >>> wrote: >>> >>>> Hi Eugene, >>>> >>>> thanks for the extended questions. >>>> >>>> I think we have two levels of expectations here: >>>> - end-user responsibility >>>> - worker/runner responsibility >>>> >>>> 1/ From a end-user perspective, the end-user has to know that using a >>>> system command (via ExecIO) and more generally speaking anything which >>>> relay on worker resources (for instance a local filesystem directory >>>> available only on a worker) can fail if the expected resource is not >>>> present on all workers. So, basically, all workers should have the same >>>> topology. It's what I'm assuming for the PR. >>>> For example, I have my Spark cluster, using the same Mesos/Docker setup, >>>> then the user knows that all nodes in the cluster will have the same >>>> setup and so resources (it could be provided by DevOps for instance). >>>> On the other hand, running on Dataflow is different because I don't >>>> "control" the nodes (bootstrapping or resources), but in that case, the >>>> user knows it (he knows the runner he's using). >>>> >>>> 2/ As you said, we can expect that runner can deal with some >>>> requirements (expressed depending of the pipeline and the runner), and >>>> the runner can know the workers which provide capabilities matching >>>> those requirements. >>>> Then, the end user is not more responsible: the runner will try to >>>> define if the pipeline can be executed, and where a DoFn has to be run >>>> (on which worker). >>>> >>>> For me, it's two different levels where 2 is smarter but 1 can also make >>>> sense. >>>> >>>> WDYT ? >>>> >>>> Regards >>>> JB >>>> >>>> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote: >>>>> >>>>> Hi JB, >>>>> >>>>> Thanks for bringing this to the mailing list. I also think that this is >>>>> useful in general (and that use cases for Beam are more than just >>>>> classic >>>>> bigdata), and that there are interesting questions here at different >>>> >>>> levels >>>>> >>>>> about how to do it right. >>>>> >>>>> I suggest to start with the highest-level question [and discuss the >>>>> particular API only after agreeing on this, possibly in a separate >>>> >>>> thread]: >>>>> >>>>> how to deal with the fact that Beam gives no guarantees about the >>>>> environment on workers, e.g. which commands are available, which shell >>>>> or >>>>> even OS is being used, etc. Particularly: >>>>> >>>>> - Obviously different runners will have a different environment, e.g. >>>>> Dataflow workers are not going to have Hadoop commands available >>>>> because >>>>> they are not running on a Hadoop cluster. So, pipelines and transforms >>>>> developed using this connector will be necessarily non-portable between >>>>> different runners. Maybe this is ok? But we need to give users a clear >>>>> expectation about this. How do we phrase this expectation and where do >>>>> we >>>>> put it in the docs? >>>>> >>>>> - I'm concerned that this puts additional compatibility requirements on >>>>> runners - it becomes necessary for a runner to document the environment >>>> >>>> of >>>>> >>>>> its workers (OS, shell, privileges, guaranteed-installed packages, >>>>> access >>>>> to other things on the host machine e.g. whether or not the worker runs >>>> >>>> in >>>>> >>>>> its own container, etc.) and to keep it stable - otherwise transforms >>>>> and >>>>> pipelines with this connector will be non-portable between runner >>>> >>>> versions >>>>> >>>>> either. >>>>> >>>>> Another way to deal with this is to give up and say "the environment on >>>> >>>> the >>>>> >>>>> workers is outside the scope of Beam; consult your runner's >>>>> documentation >>>>> or use your best judgment as to what the environment will be, and use >>>> >>>> this >>>>> >>>>> at your own risk". >>>>> >>>>> What do others think? >>>>> >>>>> On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net> >>>> >>>> wrote: >>>>> >>>>> >>>>> Hi beamers, >>>>> >>>>> Today, Beam is mainly focused on data processing. >>>>> Since the beginning of the project, we are discussing about extending >>>>> the use cases coverage via DSLs and extensions (like for machine >>>>> learning), or via IO. >>>>> >>>>> Especially for the IO, we can see Beam use for data integration and >>>>> data >>>>> ingestion. >>>>> >>>>> In this area, I'm proposing a first IO: ExecIO: >>>>> >>>>> https://issues.apache.org/jira/browse/BEAM-1059 >>>>> https://github.com/apache/incubator-beam/pull/1451 >>>>> >>>>> Actually, this IO is mainly an ExecFn that executes system commands >>>>> (again, keep in mind we are discussing about data integration/ingestion >>>>> and not data processing). >>>>> >>>>> For convenience, this ExecFn is wrapped in Read and Write (as a regular >>>> >>>> IO). >>>>> >>>>> >>>>> Clearly, this IO/Fn depends of the worker where it runs. But it's under >>>>> the user responsibility. >>>>> >>>>> During the review, Eugene and I discussed about: >>>>> - is it an IO or just a fn ? >>>>> - is it OK to have worker specific IO ? >>>>> >>>>> IMHO, an IO makes lot of sense to me and it's very convenient for end >>>>> users. They can do something like: >>>>> >>>>> PCollection<String> output = >>>>> pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); >>>>> >>>>> The pipeline will execute myscript and the output pipeline will contain >>>>> command execution std out/err. >>>>> >>>>> On the other hand, they can do: >>>>> >>>>> pcollection.apply(ExecIO.write()); >>>>> >>>>> where PCollection contains the commands to execute. >>>>> >>>>> Generally speaking, end users can call ExecFn wherever they want in the >>>>> pipeline steps: >>>>> >>>>> PCollection<String> output = pipeline.apply(ParDo.of(new >>>> >>>> ExecIO.ExecFn())); >>>>> >>>>> >>>>> The input collection contains the commands to execute, and the output >>>>> collection contains the commands execution result std out/err. >>>>> >>>>> Generally speaking, I'm preparing several IOs more on the data >>>>> integration/ingestion area than on "pure" classic big data processing. >>>>> I >>>>> think it would give a new "dimension" to Beam. >>>>> >>>>> Thoughts ? >>>>> >>>>> Regards >>>>> JB >>>>> -- >>>>> Jean-Baptiste Onofré >>>>> jbono...@apache.org >>>>> http://blog.nanthrax.net >>>>> Talend - http://www.talend.com >>>>> >>>> >>>> -- >>>> Jean-Baptiste Onofré >>>> jbono...@apache.org >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >>>> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com