Hello Romain -

I did try that, still no luck.
Also, when I put the process start logic into separate Test script, I do
notice successful docker container when I do "docker ps".
However, no such luck implementing that logic with in DoFn.
Any thoughts?
Thank you.

Regards,
Mahesh

*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*


On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <[email protected]>
wrote:

> waitFor and not java wait primitive?
>
> Le dim. 19 août 2018 04:35, Mahesh Vangala <[email protected]> a
> écrit :
>
>> Hello Beamers -
>>
>> I am trying to pull a POC - launch docker image per element in Input
>> PCollection and then return some data to Output Pcollection.
>>
>> Here is my code:
>>
>> public class VariantCaller
>>
>> {
>>
>>     public static void main( String[] args )
>>
>>     {
>>
>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>> ).create();
>>
>>         Pipeline p = Pipeline.create(opts);
>>
>>         PCollection<String> lines = p.apply(TextIO.read().from(
>> "test_in.csv"));
>>
>>         PCollection<String> outLines = lines.apply(ParDo.of(new
>> LaunchDocker.LaunchJobs()));
>>
>>         PCollection<String> mergedLines = outLines
>> .apply(Combine.globally(new AddLines()));
>>
>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>
>>         p.run();
>>
>>     }
>>
>> }
>>
>>
>> My LaunchDocker Code:
>>
>>
>> public class LaunchDocker {
>>
>>   public static class LaunchJobs extends DoFn<String, String> {
>>
>>     private static final long serialVersionUID = 1L;
>>
>>     private static final Logger LOG = LoggerFactory.getLogger(AddLines.
>> class);
>>
>>     @ProcessElement
>>
>>     public void processElement(ProcessContext c) throws Exception {
>>
>>       // Get the input element from ProcessContext.
>>
>>       String word = c.element().split(",")[0];
>>
>>       LOG.info(word);
>>
>>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>
>>           "docker run --rm ubuntu:16.04 sleep 20");
>>
>>        pb.start().wait();
>>
>>       // Use ProcessContext.output to emit the output element.
>>
>>       if (!word.isEmpty())
>>
>>         c.output(word + "\n");
>>
>>     }
>>
>>   }
>>
>> }
>>
>>
>> However, this fails with error:
>>
>>
>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>> getEstimatedSizeBytes
>>
>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>
>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>>
>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
>> and produced 1 files and 9 bundles
>>
>> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
>> processElement
>>
>> INFO: sample1
>>
>> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
>> processElement
>>
>> INFO: 4
>>
>> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
>> processElement
>>
>> INFO: 1
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalMonitorStateException
>>
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>> DirectRunner.java:332)
>>
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>> DirectRunner.java:302)
>>
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>>
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>
>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>
>> Caused by: java.lang.IllegalMonitorStateException
>>
>> at java.lang.Object.wait(Native Method)
>>
>> at java.lang.Object.wait(Object.java:502)
>>
>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>> LaunchDocker.java:19)
>>
>>
>> Can you share your ideas what's the best way of achieving this?
>>
>> Thank you for your help!
>>
>>
>> Sincerely,
>>
>> Mahesh
>>
>>
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>

Reply via email to