Hello Beamers - I am trying to pull a POC - launch docker image per element in Input PCollection and then return some data to Output Pcollection.
Here is my code: public class VariantCaller { public static void main( String[] args ) { PipelineOptions opts = PipelineOptionsFactory.fromArgs(args ).create(); Pipeline p = Pipeline.create(opts); PCollection<String> lines = p.apply(TextIO.read().from("test_in.csv" )); PCollection<String> outLines = lines.apply(ParDo.of(new LaunchDocker.LaunchJobs())); PCollection<String> mergedLines = outLines.apply(Combine.globally( new AddLines())); mergedLines.apply(TextIO.write().to("test_out.csv")); p.run(); } } My LaunchDocker Code: public class LaunchDocker { public static class LaunchJobs extends DoFn<String, String> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(AddLines.class ); @ProcessElement public void processElement(ProcessContext c) throws Exception { // Get the input element from ProcessContext. String word = c.element().split(",")[0]; LOG.info(word); ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", "docker run --rm ubuntu:16.04 sleep 20"); pb.start().wait(); // Use ProcessContext.output to emit the output element. if (!word.isEmpty()) c.output(word + "\n"); } } } However, this fails with error: Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes INFO: Filepattern test_in.csv matched 1 files with total size 36 Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms and produced 1 files and 9 bundles Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs processElement INFO: sample1 Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs processElement INFO: 4 Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs processElement INFO: 1 Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalMonitorStateException at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( DirectRunner.java:332) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( DirectRunner.java:302) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) Caused by: java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( LaunchDocker.java:19) Can you share your ideas what's the best way of achieving this? Thank you for your help! Sincerely, Mahesh *--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com <http://mvangala.com>*