Hi Mahesh, Did you get the same error? This is quite unrelated to beam itself normally. If your engine (spark, dataflow etc) doesn't have a security manager active it should be enough, if it has you can be forbidden to use that.
Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <[email protected]> a écrit : > Hello Romain - > > I did try that, still no luck. > Also, when I put the process start logic into separate Test script, I do > notice successful docker container when I do "docker ps". > However, no such luck implementing that logic with in DoFn. > Any thoughts? > Thank you. > > Regards, > Mahesh > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* > > > On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <[email protected]> > wrote: > >> waitFor and not java wait primitive? >> >> Le dim. 19 août 2018 04:35, Mahesh Vangala <[email protected]> a >> écrit : >> >>> Hello Beamers - >>> >>> I am trying to pull a POC - launch docker image per element in Input >>> PCollection and then return some data to Output Pcollection. >>> >>> Here is my code: >>> >>> public class VariantCaller >>> >>> { >>> >>> public static void main( String[] args ) >>> >>> { >>> >>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>> ).create(); >>> >>> Pipeline p = Pipeline.create(opts); >>> >>> PCollection<String> lines = p.apply(TextIO.read().from( >>> "test_in.csv")); >>> >>> PCollection<String> outLines = lines.apply(ParDo.of(new >>> LaunchDocker.LaunchJobs())); >>> >>> PCollection<String> mergedLines = outLines >>> .apply(Combine.globally(new AddLines())); >>> >>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>> >>> p.run(); >>> >>> } >>> >>> } >>> >>> >>> My LaunchDocker Code: >>> >>> >>> public class LaunchDocker { >>> >>> public static class LaunchJobs extends DoFn<String, String> { >>> >>> private static final long serialVersionUID = 1L; >>> >>> private static final Logger LOG = LoggerFactory.getLogger(AddLines. >>> class); >>> >>> @ProcessElement >>> >>> public void processElement(ProcessContext c) throws Exception { >>> >>> // Get the input element from ProcessContext. >>> >>> String word = c.element().split(",")[0]; >>> >>> LOG.info(word); >>> >>> ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", >>> >>> "docker run --rm ubuntu:16.04 sleep 20"); >>> >>> pb.start().wait(); >>> >>> // Use ProcessContext.output to emit the output element. >>> >>> if (!word.isEmpty()) >>> >>> c.output(word + "\n"); >>> >>> } >>> >>> } >>> >>> } >>> >>> >>> However, this fails with error: >>> >>> >>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource >>> getEstimatedSizeBytes >>> >>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>> >>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split >>> >>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms >>> and produced 1 files and 9 bundles >>> >>> Aug 18, 2018 10:30:23 PM >>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>> >>> INFO: sample1 >>> >>> Aug 18, 2018 10:30:23 PM >>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>> >>> INFO: 4 >>> >>> Aug 18, 2018 10:30:23 PM >>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>> >>> INFO: 1 >>> >>> Exception in thread "main" >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>> java.lang.IllegalMonitorStateException >>> >>> at >>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>> DirectRunner.java:332) >>> >>> at >>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>> DirectRunner.java:302) >>> >>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197 >>> ) >>> >>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) >>> >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>> >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>> >>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >>> >>> Caused by: java.lang.IllegalMonitorStateException >>> >>> at java.lang.Object.wait(Native Method) >>> >>> at java.lang.Object.wait(Object.java:502) >>> >>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( >>> LaunchDocker.java:19) >>> >>> >>> Can you share your ideas what's the best way of achieving this? >>> >>> Thank you for your help! >>> >>> >>> Sincerely, >>> >>> Mahesh >>> >>> >>> >>> *--* >>> *Mahesh Vangala* >>> *(Ph) 443-326-1957* >>> *(web) mvangala.com <http://mvangala.com>* >>> >>
