waitFor and not java wait primitive? Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamahe...@gmail.com> a écrit :
> Hello Beamers - > > I am trying to pull a POC - launch docker image per element in Input > PCollection and then return some data to Output Pcollection. > > Here is my code: > > public class VariantCaller > > { > > public static void main( String[] args ) > > { > > PipelineOptions opts = PipelineOptionsFactory.fromArgs(args > ).create(); > > Pipeline p = Pipeline.create(opts); > > PCollection<String> lines = p.apply(TextIO.read().from( > "test_in.csv")); > > PCollection<String> outLines = lines.apply(ParDo.of(new > LaunchDocker.LaunchJobs())); > > PCollection<String> mergedLines = outLines.apply(Combine.globally( > new AddLines())); > > mergedLines.apply(TextIO.write().to("test_out.csv")); > > p.run(); > > } > > } > > > My LaunchDocker Code: > > > public class LaunchDocker { > > public static class LaunchJobs extends DoFn<String, String> { > > private static final long serialVersionUID = 1L; > > private static final Logger LOG = LoggerFactory.getLogger(AddLines. > class); > > @ProcessElement > > public void processElement(ProcessContext c) throws Exception { > > // Get the input element from ProcessContext. > > String word = c.element().split(",")[0]; > > LOG.info(word); > > ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", > > "docker run --rm ubuntu:16.04 sleep 20"); > > pb.start().wait(); > > // Use ProcessContext.output to emit the output element. > > if (!word.isEmpty()) > > c.output(word + "\n"); > > } > > } > > } > > > However, this fails with error: > > > Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource > getEstimatedSizeBytes > > INFO: Filepattern test_in.csv matched 1 files with total size 36 > > Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split > > INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms > and produced 1 files and 9 bundles > > Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs > processElement > > INFO: sample1 > > Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs > processElement > > INFO: 4 > > Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs > processElement > > INFO: 1 > > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalMonitorStateException > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( > DirectRunner.java:332) > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( > DirectRunner.java:302) > > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) > > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) > > at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) > > Caused by: java.lang.IllegalMonitorStateException > > at java.lang.Object.wait(Native Method) > > at java.lang.Object.wait(Object.java:502) > > at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( > LaunchDocker.java:19) > > > Can you share your ideas what's the best way of achieving this? > > Thank you for your help! > > > Sincerely, > > Mahesh > > > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* >