Hello Beamers -

I am trying to pull a POC - launch docker image per element in Input
PCollection and then return some data to Output Pcollection.

Here is my code:

public class VariantCaller

{

    public static void main( String[] args )

    {

        PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
).create();

        Pipeline p = Pipeline.create(opts);

        PCollection<String> lines = p.apply(TextIO.read().from("test_in.csv"
));

        PCollection<String> outLines = lines.apply(ParDo.of(new
LaunchDocker.LaunchJobs()));

        PCollection<String> mergedLines = outLines.apply(Combine.globally(
new AddLines()));

        mergedLines.apply(TextIO.write().to("test_out.csv"));

        p.run();

    }

}


My LaunchDocker Code:


public class LaunchDocker {

  public static class LaunchJobs extends DoFn<String, String> {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(AddLines.class
);

    @ProcessElement

    public void processElement(ProcessContext c) throws Exception {

      // Get the input element from ProcessContext.

      String word = c.element().split(",")[0];

      LOG.info(word);

      ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",

          "docker run --rm ubuntu:16.04 sleep 20");

       pb.start().wait();

      // Use ProcessContext.output to emit the output element.

      if (!word.isEmpty())

        c.output(word + "\n");

    }

  }

}


However, this fails with error:


Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
getEstimatedSizeBytes

INFO: Filepattern test_in.csv matched 1 files with total size 36

Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split

INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
and produced 1 files and 9 bundles

Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
processElement

INFO: sample1

Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
processElement

INFO: 4

Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
processElement

INFO: 1

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalMonitorStateException

at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
DirectRunner.java:332)

at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
DirectRunner.java:302)

at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)

at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)

at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)

Caused by: java.lang.IllegalMonitorStateException

at java.lang.Object.wait(Native Method)

at java.lang.Object.wait(Object.java:502)

at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
LaunchDocker.java:19)


Can you share your ideas what's the best way of achieving this?

Thank you for your help!


Sincerely,

Mahesh



*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*

Reply via email to