waitFor and not java wait primitive?

Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamahe...@gmail.com> a
écrit :

> Hello Beamers -
>
> I am trying to pull a POC - launch docker image per element in Input
> PCollection and then return some data to Output Pcollection.
>
> Here is my code:
>
> public class VariantCaller
>
> {
>
>     public static void main( String[] args )
>
>     {
>
>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
> ).create();
>
>         Pipeline p = Pipeline.create(opts);
>
>         PCollection<String> lines = p.apply(TextIO.read().from(
> "test_in.csv"));
>
>         PCollection<String> outLines = lines.apply(ParDo.of(new
> LaunchDocker.LaunchJobs()));
>
>         PCollection<String> mergedLines = outLines.apply(Combine.globally(
> new AddLines()));
>
>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>
>         p.run();
>
>     }
>
> }
>
>
> My LaunchDocker Code:
>
>
> public class LaunchDocker {
>
>   public static class LaunchJobs extends DoFn<String, String> {
>
>     private static final long serialVersionUID = 1L;
>
>     private static final Logger LOG = LoggerFactory.getLogger(AddLines.
> class);
>
>     @ProcessElement
>
>     public void processElement(ProcessContext c) throws Exception {
>
>       // Get the input element from ProcessContext.
>
>       String word = c.element().split(",")[0];
>
>       LOG.info(word);
>
>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>
>           "docker run --rm ubuntu:16.04 sleep 20");
>
>        pb.start().wait();
>
>       // Use ProcessContext.output to emit the output element.
>
>       if (!word.isEmpty())
>
>         c.output(word + "\n");
>
>     }
>
>   }
>
> }
>
>
> However, this fails with error:
>
>
> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
> getEstimatedSizeBytes
>
> INFO: Filepattern test_in.csv matched 1 files with total size 36
>
> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>
> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
> and produced 1 files and 9 bundles
>
> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
> processElement
>
> INFO: sample1
>
> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
> processElement
>
> INFO: 4
>
> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
> processElement
>
> INFO: 1
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalMonitorStateException
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:332)
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:302)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>
> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>
> Caused by: java.lang.IllegalMonitorStateException
>
> at java.lang.Object.wait(Native Method)
>
> at java.lang.Object.wait(Object.java:502)
>
> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
> LaunchDocker.java:19)
>
>
> Can you share your ideas what's the best way of achieving this?
>
> Thank you for your help!
>
>
> Sincerely,
>
> Mahesh
>
>
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>

Reply via email to