Hi Romain - Got it. Apparently, I needed to run beam as sudo to launch the docker containers. I still need to figure out why that's the case. Thank you. *--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com <http://mvangala.com>*
On Mon, Aug 20, 2018 at 11:05 AM Mahesh Vangala <vangalamahe...@gmail.com> wrote: > Hello Romain - > > So, I have added pb.inheritIO().start().waitFor(); and now I have an > error /bin/bash: docker: command not found. > But, I have docker installed on the system. /usr/local/bin/docker > Any ideas why I'm seeing this error when launched from within DoFn? > Thank you so much for your help. > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* > > > On Mon, Aug 20, 2018 at 10:58 AM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Weird, this code works: >> >> https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e >> >> Are you sure your test_in.csv has some data (otherwise no DoFn processing >> will be triggered)? >> >> Romain Manni-Bucau >> @rmannibucau <https://twitter.com/rmannibucau> | Blog >> <https://rmannibucau.metawerx.net/> | Old Blog >> <http://rmannibucau.wordpress.com> | Github >> <https://github.com/rmannibucau> | LinkedIn >> <https://www.linkedin.com/in/rmannibucau> | Book >> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >> >> >> Le lun. 20 août 2018 à 16:33, Mahesh Vangala <vangalamahe...@gmail.com> >> a écrit : >> >>> Hi Romain - >>> >>> I don't see any errors when I used waitFor(). >>> However, I don't see those processes being executed either since "docker >>> ps -a" doesn't list any processes. >>> This is quite unrelated to beam itself normally. If your engine (spark, >>> dataflow etc) doesn't have a security manager active >>> I am using DirectRunner though. >>> Let me know. >>> Thank you! >>> >>> *--* >>> *Mahesh Vangala* >>> *(Ph) 443-326-1957* >>> *(web) mvangala.com <http://mvangala.com>* >>> >>> >>> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> Hi Mahesh, >>>> >>>> Did you get the same error? This is quite unrelated to beam itself >>>> normally. If your engine (spark, dataflow etc) doesn't have a security >>>> manager active it should be enough, if it has you can be forbidden to use >>>> that. >>>> >>>> Romain Manni-Bucau >>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>> <http://rmannibucau.wordpress.com> | Github >>>> <https://github.com/rmannibucau> | LinkedIn >>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>> >>>> >>>> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <vangalamahe...@gmail.com> >>>> a écrit : >>>> >>>>> Hello Romain - >>>>> >>>>> I did try that, still no luck. >>>>> Also, when I put the process start logic into separate Test script, I >>>>> do notice successful docker container when I do "docker ps". >>>>> However, no such luck implementing that logic with in DoFn. >>>>> Any thoughts? >>>>> Thank you. >>>>> >>>>> Regards, >>>>> Mahesh >>>>> >>>>> *--* >>>>> *Mahesh Vangala* >>>>> *(Ph) 443-326-1957* >>>>> *(web) mvangala.com <http://mvangala.com>* >>>>> >>>>> >>>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> waitFor and not java wait primitive? >>>>>> >>>>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamahe...@gmail.com> >>>>>> a écrit : >>>>>> >>>>>>> Hello Beamers - >>>>>>> >>>>>>> I am trying to pull a POC - launch docker image per element in Input >>>>>>> PCollection and then return some data to Output Pcollection. >>>>>>> >>>>>>> Here is my code: >>>>>>> >>>>>>> public class VariantCaller >>>>>>> >>>>>>> { >>>>>>> >>>>>>> public static void main( String[] args ) >>>>>>> >>>>>>> { >>>>>>> >>>>>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>>>>> ).create(); >>>>>>> >>>>>>> Pipeline p = Pipeline.create(opts); >>>>>>> >>>>>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>>>>> "test_in.csv")); >>>>>>> >>>>>>> PCollection<String> outLines = lines.apply(ParDo.of(new >>>>>>> LaunchDocker.LaunchJobs())); >>>>>>> >>>>>>> PCollection<String> mergedLines = outLines >>>>>>> .apply(Combine.globally(new AddLines())); >>>>>>> >>>>>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>>>>> >>>>>>> p.run(); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> My LaunchDocker Code: >>>>>>> >>>>>>> >>>>>>> public class LaunchDocker { >>>>>>> >>>>>>> public static class LaunchJobs extends DoFn<String, String> { >>>>>>> >>>>>>> private static final long serialVersionUID = 1L; >>>>>>> >>>>>>> private static final Logger LOG = >>>>>>> LoggerFactory.getLogger(AddLines.class); >>>>>>> >>>>>>> @ProcessElement >>>>>>> >>>>>>> public void processElement(ProcessContext c) throws Exception { >>>>>>> >>>>>>> // Get the input element from ProcessContext. >>>>>>> >>>>>>> String word = c.element().split(",")[0]; >>>>>>> >>>>>>> LOG.info(word); >>>>>>> >>>>>>> ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", >>>>>>> >>>>>>> "docker run --rm ubuntu:16.04 sleep 20"); >>>>>>> >>>>>>> pb.start().wait(); >>>>>>> >>>>>>> // Use ProcessContext.output to emit the output element. >>>>>>> >>>>>>> if (!word.isEmpty()) >>>>>>> >>>>>>> c.output(word + "\n"); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> However, this fails with error: >>>>>>> >>>>>>> >>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource >>>>>>> getEstimatedSizeBytes >>>>>>> >>>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>>>>> >>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split >>>>>>> >>>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took >>>>>>> 1 ms and produced 1 files and 9 bundles >>>>>>> >>>>>>> Aug 18, 2018 10:30:23 PM >>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>>> >>>>>>> INFO: sample1 >>>>>>> >>>>>>> Aug 18, 2018 10:30:23 PM >>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>>> >>>>>>> INFO: 4 >>>>>>> >>>>>>> Aug 18, 2018 10:30:23 PM >>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>>> >>>>>>> INFO: 1 >>>>>>> >>>>>>> Exception in thread "main" >>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>>> java.lang.IllegalMonitorStateException >>>>>>> >>>>>>> at >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>>>> DirectRunner.java:332) >>>>>>> >>>>>>> at >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>>>> DirectRunner.java:302) >>>>>>> >>>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>>> DirectRunner.java:197) >>>>>>> >>>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>>> DirectRunner.java:64) >>>>>>> >>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>>>>> >>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>>>>> >>>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29 >>>>>>> ) >>>>>>> >>>>>>> Caused by: java.lang.IllegalMonitorStateException >>>>>>> >>>>>>> at java.lang.Object.wait(Native Method) >>>>>>> >>>>>>> at java.lang.Object.wait(Object.java:502) >>>>>>> >>>>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( >>>>>>> LaunchDocker.java:19) >>>>>>> >>>>>>> >>>>>>> Can you share your ideas what's the best way of achieving this? >>>>>>> >>>>>>> Thank you for your help! >>>>>>> >>>>>>> >>>>>>> Sincerely, >>>>>>> >>>>>>> Mahesh >>>>>>> >>>>>>> >>>>>>> >>>>>>> *--* >>>>>>> *Mahesh Vangala* >>>>>>> *(Ph) 443-326-1957* >>>>>>> *(web) mvangala.com <http://mvangala.com>* >>>>>>> >>>>>>