Weird, this code works: https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e
Are you sure your test_in.csv has some data (otherwise no DoFn processing will be triggered)? Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> Le lun. 20 août 2018 à 16:33, Mahesh Vangala <vangalamahe...@gmail.com> a écrit : > Hi Romain - > > I don't see any errors when I used waitFor(). > However, I don't see those processes being executed either since "docker > ps -a" doesn't list any processes. > This is quite unrelated to beam itself normally. If your engine (spark, > dataflow etc) doesn't have a security manager active > I am using DirectRunner though. > Let me know. > Thank you! > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* > > > On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Hi Mahesh, >> >> Did you get the same error? This is quite unrelated to beam itself >> normally. If your engine (spark, dataflow etc) doesn't have a security >> manager active it should be enough, if it has you can be forbidden to use >> that. >> >> Romain Manni-Bucau >> @rmannibucau <https://twitter.com/rmannibucau> | Blog >> <https://rmannibucau.metawerx.net/> | Old Blog >> <http://rmannibucau.wordpress.com> | Github >> <https://github.com/rmannibucau> | LinkedIn >> <https://www.linkedin.com/in/rmannibucau> | Book >> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >> >> >> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <vangalamahe...@gmail.com> >> a écrit : >> >>> Hello Romain - >>> >>> I did try that, still no luck. >>> Also, when I put the process start logic into separate Test script, I do >>> notice successful docker container when I do "docker ps". >>> However, no such luck implementing that logic with in DoFn. >>> Any thoughts? >>> Thank you. >>> >>> Regards, >>> Mahesh >>> >>> *--* >>> *Mahesh Vangala* >>> *(Ph) 443-326-1957* >>> *(web) mvangala.com <http://mvangala.com>* >>> >>> >>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> waitFor and not java wait primitive? >>>> >>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamahe...@gmail.com> >>>> a écrit : >>>> >>>>> Hello Beamers - >>>>> >>>>> I am trying to pull a POC - launch docker image per element in Input >>>>> PCollection and then return some data to Output Pcollection. >>>>> >>>>> Here is my code: >>>>> >>>>> public class VariantCaller >>>>> >>>>> { >>>>> >>>>> public static void main( String[] args ) >>>>> >>>>> { >>>>> >>>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>>> ).create(); >>>>> >>>>> Pipeline p = Pipeline.create(opts); >>>>> >>>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>>> "test_in.csv")); >>>>> >>>>> PCollection<String> outLines = lines.apply(ParDo.of(new >>>>> LaunchDocker.LaunchJobs())); >>>>> >>>>> PCollection<String> mergedLines = outLines >>>>> .apply(Combine.globally(new AddLines())); >>>>> >>>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>>> >>>>> p.run(); >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> My LaunchDocker Code: >>>>> >>>>> >>>>> public class LaunchDocker { >>>>> >>>>> public static class LaunchJobs extends DoFn<String, String> { >>>>> >>>>> private static final long serialVersionUID = 1L; >>>>> >>>>> private static final Logger LOG = >>>>> LoggerFactory.getLogger(AddLines.class); >>>>> >>>>> @ProcessElement >>>>> >>>>> public void processElement(ProcessContext c) throws Exception { >>>>> >>>>> // Get the input element from ProcessContext. >>>>> >>>>> String word = c.element().split(",")[0]; >>>>> >>>>> LOG.info(word); >>>>> >>>>> ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", >>>>> >>>>> "docker run --rm ubuntu:16.04 sleep 20"); >>>>> >>>>> pb.start().wait(); >>>>> >>>>> // Use ProcessContext.output to emit the output element. >>>>> >>>>> if (!word.isEmpty()) >>>>> >>>>> c.output(word + "\n"); >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> However, this fails with error: >>>>> >>>>> >>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource >>>>> getEstimatedSizeBytes >>>>> >>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>>> >>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split >>>>> >>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 >>>>> ms and produced 1 files and 9 bundles >>>>> >>>>> Aug 18, 2018 10:30:23 PM >>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>> >>>>> INFO: sample1 >>>>> >>>>> Aug 18, 2018 10:30:23 PM >>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>> >>>>> INFO: 4 >>>>> >>>>> Aug 18, 2018 10:30:23 PM >>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>> >>>>> INFO: 1 >>>>> >>>>> Exception in thread "main" >>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>> java.lang.IllegalMonitorStateException >>>>> >>>>> at >>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>> DirectRunner.java:332) >>>>> >>>>> at >>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>> DirectRunner.java:302) >>>>> >>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>> DirectRunner.java:197) >>>>> >>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>> DirectRunner.java:64) >>>>> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>>> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>>> >>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >>>>> >>>>> Caused by: java.lang.IllegalMonitorStateException >>>>> >>>>> at java.lang.Object.wait(Native Method) >>>>> >>>>> at java.lang.Object.wait(Object.java:502) >>>>> >>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( >>>>> LaunchDocker.java:19) >>>>> >>>>> >>>>> Can you share your ideas what's the best way of achieving this? >>>>> >>>>> Thank you for your help! >>>>> >>>>> >>>>> Sincerely, >>>>> >>>>> Mahesh >>>>> >>>>> >>>>> >>>>> *--* >>>>> *Mahesh Vangala* >>>>> *(Ph) 443-326-1957* >>>>> *(web) mvangala.com <http://mvangala.com>* >>>>> >>>>