Hello Beamers -
I am trying to pull a POC - launch docker image per element in Input
PCollection and then return some data to Output Pcollection.
Here is my code:
public class VariantCaller
{
public static void main( String[] args )
{
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
).create();
Pipeline p = Pipeline.create(opts);
PCollection<String> lines = p.apply(TextIO.read().from("test_in.csv"
));
PCollection<String> outLines = lines.apply(ParDo.of(new
LaunchDocker.LaunchJobs()));
PCollection<String> mergedLines = outLines.apply(Combine.globally(
new AddLines()));
mergedLines.apply(TextIO.write().to("test_out.csv"));
p.run();
}
}
My LaunchDocker Code:
public class LaunchDocker {
public static class LaunchJobs extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AddLines.class
);
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
// Get the input element from ProcessContext.
String word = c.element().split(",")[0];
LOG.info(word);
ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
"docker run --rm ubuntu:16.04 sleep 20");
pb.start().wait();
// Use ProcessContext.output to emit the output element.
if (!word.isEmpty())
c.output(word + "\n");
}
}
}
However, this fails with error:
Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
getEstimatedSizeBytes
INFO: Filepattern test_in.csv matched 1 files with total size 36
Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
and produced 1 files and 9 bundles
Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
processElement
INFO: sample1
Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
processElement
INFO: 4
Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
processElement
INFO: 1
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalMonitorStateException
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
DirectRunner.java:332)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
DirectRunner.java:302)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
Caused by: java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
LaunchDocker.java:19)
Can you share your ideas what's the best way of achieving this?
Thank you for your help!
Sincerely,
Mahesh
*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*