Hi Mahesh,

Did you get the same error? This is quite unrelated to beam itself
normally. If your engine (spark, dataflow etc) doesn't have a security
manager active it should be enough, if it has you can be forbidden to use
that.

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le lun. 20 août 2018 à 16:08, Mahesh Vangala <[email protected]> a
écrit :

> Hello Romain -
>
> I did try that, still no luck.
> Also, when I put the process start logic into separate Test script, I do
> notice successful docker container when I do "docker ps".
> However, no such luck implementing that logic with in DoFn.
> Any thoughts?
> Thank you.
>
> Regards,
> Mahesh
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <[email protected]>
> wrote:
>
>> waitFor and not java wait primitive?
>>
>> Le dim. 19 août 2018 04:35, Mahesh Vangala <[email protected]> a
>> écrit :
>>
>>> Hello Beamers -
>>>
>>> I am trying to pull a POC - launch docker image per element in Input
>>> PCollection and then return some data to Output Pcollection.
>>>
>>> Here is my code:
>>>
>>> public class VariantCaller
>>>
>>> {
>>>
>>>     public static void main( String[] args )
>>>
>>>     {
>>>
>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>> ).create();
>>>
>>>         Pipeline p = Pipeline.create(opts);
>>>
>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>> "test_in.csv"));
>>>
>>>         PCollection<String> outLines = lines.apply(ParDo.of(new
>>> LaunchDocker.LaunchJobs()));
>>>
>>>         PCollection<String> mergedLines = outLines
>>> .apply(Combine.globally(new AddLines()));
>>>
>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>
>>>         p.run();
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>> My LaunchDocker Code:
>>>
>>>
>>> public class LaunchDocker {
>>>
>>>   public static class LaunchJobs extends DoFn<String, String> {
>>>
>>>     private static final long serialVersionUID = 1L;
>>>
>>>     private static final Logger LOG = LoggerFactory.getLogger(AddLines.
>>> class);
>>>
>>>     @ProcessElement
>>>
>>>     public void processElement(ProcessContext c) throws Exception {
>>>
>>>       // Get the input element from ProcessContext.
>>>
>>>       String word = c.element().split(",")[0];
>>>
>>>       LOG.info(word);
>>>
>>>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>>
>>>           "docker run --rm ubuntu:16.04 sleep 20");
>>>
>>>        pb.start().wait();
>>>
>>>       // Use ProcessContext.output to emit the output element.
>>>
>>>       if (!word.isEmpty())
>>>
>>>         c.output(word + "\n");
>>>
>>>     }
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> However, this fails with error:
>>>
>>>
>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>> getEstimatedSizeBytes
>>>
>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>
>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>>>
>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
>>> and produced 1 files and 9 bundles
>>>
>>> Aug 18, 2018 10:30:23 PM
>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>
>>> INFO: sample1
>>>
>>> Aug 18, 2018 10:30:23 PM
>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>
>>> INFO: 4
>>>
>>> Aug 18, 2018 10:30:23 PM
>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>
>>> INFO: 1
>>>
>>> Exception in thread "main"
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.IllegalMonitorStateException
>>>
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>> DirectRunner.java:332)
>>>
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>> DirectRunner.java:302)
>>>
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197
>>> )
>>>
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>>>
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>
>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>
>>> Caused by: java.lang.IllegalMonitorStateException
>>>
>>> at java.lang.Object.wait(Native Method)
>>>
>>> at java.lang.Object.wait(Object.java:502)
>>>
>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>>> LaunchDocker.java:19)
>>>
>>>
>>> Can you share your ideas what's the best way of achieving this?
>>>
>>> Thank you for your help!
>>>
>>>
>>> Sincerely,
>>>
>>> Mahesh
>>>
>>>
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>

Reply via email to