Hello Romain - I did try that, still no luck. Also, when I put the process start logic into separate Test script, I do notice successful docker container when I do "docker ps". However, no such luck implementing that logic with in DoFn. Any thoughts? Thank you.
Regards, Mahesh *--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com <http://mvangala.com>* On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <[email protected]> wrote: > waitFor and not java wait primitive? > > Le dim. 19 août 2018 04:35, Mahesh Vangala <[email protected]> a > écrit : > >> Hello Beamers - >> >> I am trying to pull a POC - launch docker image per element in Input >> PCollection and then return some data to Output Pcollection. >> >> Here is my code: >> >> public class VariantCaller >> >> { >> >> public static void main( String[] args ) >> >> { >> >> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >> ).create(); >> >> Pipeline p = Pipeline.create(opts); >> >> PCollection<String> lines = p.apply(TextIO.read().from( >> "test_in.csv")); >> >> PCollection<String> outLines = lines.apply(ParDo.of(new >> LaunchDocker.LaunchJobs())); >> >> PCollection<String> mergedLines = outLines >> .apply(Combine.globally(new AddLines())); >> >> mergedLines.apply(TextIO.write().to("test_out.csv")); >> >> p.run(); >> >> } >> >> } >> >> >> My LaunchDocker Code: >> >> >> public class LaunchDocker { >> >> public static class LaunchJobs extends DoFn<String, String> { >> >> private static final long serialVersionUID = 1L; >> >> private static final Logger LOG = LoggerFactory.getLogger(AddLines. >> class); >> >> @ProcessElement >> >> public void processElement(ProcessContext c) throws Exception { >> >> // Get the input element from ProcessContext. >> >> String word = c.element().split(",")[0]; >> >> LOG.info(word); >> >> ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", >> >> "docker run --rm ubuntu:16.04 sleep 20"); >> >> pb.start().wait(); >> >> // Use ProcessContext.output to emit the output element. >> >> if (!word.isEmpty()) >> >> c.output(word + "\n"); >> >> } >> >> } >> >> } >> >> >> However, this fails with error: >> >> >> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource >> getEstimatedSizeBytes >> >> INFO: Filepattern test_in.csv matched 1 files with total size 36 >> >> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split >> >> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms >> and produced 1 files and 9 bundles >> >> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs >> processElement >> >> INFO: sample1 >> >> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs >> processElement >> >> INFO: 4 >> >> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs >> processElement >> >> INFO: 1 >> >> Exception in thread "main" >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.lang.IllegalMonitorStateException >> >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >> DirectRunner.java:332) >> >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >> DirectRunner.java:302) >> >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) >> >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >> >> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >> >> Caused by: java.lang.IllegalMonitorStateException >> >> at java.lang.Object.wait(Native Method) >> >> at java.lang.Object.wait(Object.java:502) >> >> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( >> LaunchDocker.java:19) >> >> >> Can you share your ideas what's the best way of achieving this? >> >> Thank you for your help! >> >> >> Sincerely, >> >> Mahesh >> >> >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com <http://mvangala.com>* >> >
