Weird, this code works:

https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e

Are you sure your test_in.csv has some data (otherwise no DoFn processing
will be triggered)?

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le lun. 20 août 2018 à 16:33, Mahesh Vangala <vangalamahe...@gmail.com> a
écrit :

> Hi Romain -
>
> I don't see any errors when I used waitFor().
> However, I don't see those processes being executed either since "docker
> ps -a" doesn't list any processes.
> This is quite unrelated to beam itself normally. If your engine (spark,
> dataflow etc) doesn't have a security manager active
> I am using DirectRunner though.
> Let me know.
> Thank you!
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> Hi Mahesh,
>>
>> Did you get the same error? This is quite unrelated to beam itself
>> normally. If your engine (spark, dataflow etc) doesn't have a security
>> manager active it should be enough, if it has you can be forbidden to use
>> that.
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>>
>> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <vangalamahe...@gmail.com>
>> a écrit :
>>
>>> Hello Romain -
>>>
>>> I did try that, still no luck.
>>> Also, when I put the process start logic into separate Test script, I do
>>> notice successful docker container when I do "docker ps".
>>> However, no such luck implementing that logic with in DoFn.
>>> Any thoughts?
>>> Thank you.
>>>
>>> Regards,
>>> Mahesh
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>>
>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>> waitFor and not java wait primitive?
>>>>
>>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamahe...@gmail.com>
>>>> a écrit :
>>>>
>>>>> Hello Beamers -
>>>>>
>>>>> I am trying to pull a POC - launch docker image per element in Input
>>>>> PCollection and then return some data to Output Pcollection.
>>>>>
>>>>> Here is my code:
>>>>>
>>>>> public class VariantCaller
>>>>>
>>>>> {
>>>>>
>>>>>     public static void main( String[] args )
>>>>>
>>>>>     {
>>>>>
>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>> ).create();
>>>>>
>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>
>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>> "test_in.csv"));
>>>>>
>>>>>         PCollection<String> outLines = lines.apply(ParDo.of(new
>>>>> LaunchDocker.LaunchJobs()));
>>>>>
>>>>>         PCollection<String> mergedLines = outLines
>>>>> .apply(Combine.globally(new AddLines()));
>>>>>
>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>
>>>>>         p.run();
>>>>>
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> My LaunchDocker Code:
>>>>>
>>>>>
>>>>> public class LaunchDocker {
>>>>>
>>>>>   public static class LaunchJobs extends DoFn<String, String> {
>>>>>
>>>>>     private static final long serialVersionUID = 1L;
>>>>>
>>>>>     private static final Logger LOG =
>>>>> LoggerFactory.getLogger(AddLines.class);
>>>>>
>>>>>     @ProcessElement
>>>>>
>>>>>     public void processElement(ProcessContext c) throws Exception {
>>>>>
>>>>>       // Get the input element from ProcessContext.
>>>>>
>>>>>       String word = c.element().split(",")[0];
>>>>>
>>>>>       LOG.info(word);
>>>>>
>>>>>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>>>>
>>>>>           "docker run --rm ubuntu:16.04 sleep 20");
>>>>>
>>>>>        pb.start().wait();
>>>>>
>>>>>       // Use ProcessContext.output to emit the output element.
>>>>>
>>>>>       if (!word.isEmpty())
>>>>>
>>>>>         c.output(word + "\n");
>>>>>
>>>>>     }
>>>>>
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> However, this fails with error:
>>>>>
>>>>>
>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>>>> getEstimatedSizeBytes
>>>>>
>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>>
>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>>
>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1
>>>>> ms and produced 1 files and 9 bundles
>>>>>
>>>>> Aug 18, 2018 10:30:23 PM
>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>
>>>>> INFO: sample1
>>>>>
>>>>> Aug 18, 2018 10:30:23 PM
>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>
>>>>> INFO: 4
>>>>>
>>>>> Aug 18, 2018 10:30:23 PM
>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>
>>>>> INFO: 1
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>> java.lang.IllegalMonitorStateException
>>>>>
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>> DirectRunner.java:332)
>>>>>
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>> DirectRunner.java:302)
>>>>>
>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>> DirectRunner.java:197)
>>>>>
>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>> DirectRunner.java:64)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>>
>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>>>
>>>>> Caused by: java.lang.IllegalMonitorStateException
>>>>>
>>>>> at java.lang.Object.wait(Native Method)
>>>>>
>>>>> at java.lang.Object.wait(Object.java:502)
>>>>>
>>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>>>>> LaunchDocker.java:19)
>>>>>
>>>>>
>>>>> Can you share your ideas what's the best way of achieving this?
>>>>>
>>>>> Thank you for your help!
>>>>>
>>>>>
>>>>> Sincerely,
>>>>>
>>>>> Mahesh
>>>>>
>>>>>
>>>>>
>>>>> *--*
>>>>> *Mahesh Vangala*
>>>>> *(Ph) 443-326-1957*
>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>
>>>>

Reply via email to