Re: ElasticIO retry configuration exception

2018-10-11 Thread Romain Manni-Bucau
It looks more like a client issue where the stream is already read, maybe
give a try to reproduce it in a unit test in beam ES module? This will
enable us to help you more accurately.

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers <
wout.scheep...@vente-exclusive.com> a écrit :

> Hey Romain,
>
>
>
> I’ve check and am using the same http client as beam 2.7.0.
>
> Just to be sure, I’ve created a minimal reproducible with a fresh project 
> with only the following dependencies in my build.gradle:
> dependencies {
> compile (*'org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0'*)
> compile (*'org.apache.beam:beam-runners-direct-java:2.7.0'*)
> compile 
> (*'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0'*)
> compile 
> (*'org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0'*)
> compile 
> (*'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-io-common:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-io-jdbc:2.7.0'*)
>
>
> testCompile
> *'org.hamcrest:hamcrest-all:1.3'*testCompile
> *'org.assertj:assertj-core:3.4.1'*testCompile
> *'junit:junit:4.12'*}
>
>
>
> However, the problem still persists when writing a document to elastic
> with the retryConfiguration set.
>
> I guess the problem lies at my elastic version, as JB implies?
>
>
>
> Anyway, thanks for the suggestion.
>
>
>
> Wout
>
>
>
> *From: *Romain Manni-Bucau 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, 10 October 2018 at 16:53
> *To: *"user@beam.apache.org" 
> *Subject: *Re: ElasticIO retry configuration exception
>
>
>
> Hi Wout,
>
>
>
> Maye check your classpath http client versions (against
> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
> for instance).
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
>
>
>
>
> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers <
> wout.scheep...@vente-exclusive.com> a écrit :
>
> Hey JB,
>
> Thanks for your fast reply.
> The elastic version we're using is 5.6.2.
>
> "version": {
> "number": "5.6.2",
> "build_hash": "57e20f3",
> "build_date": "2017-09-23T13:16:45.703Z",
> "build_snapshot": false,
> "lucene_version": "6.6.1"
> }
>
>
> Wout
>
>
>
> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
>
> Hi Wout,
>
> what's the elasticsearch version ? (just to try to reproduce)
>
> Thanks,
> Regards
> JB
>
> On 10/10/2018 15:31, Wout Scheepers wrote:
> > Hey all,
> >
> >
> >
> > When using .withRetryConfiguration()for ElasticsearchIO, I get the
> > following stacktrace:
> >
> >
> >
> > Caused by:
> com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input
> >
> > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> > column: 0]
> >
> >at
> >
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> >
> >at
> >
> com.fa

Re: ElasticIO retry configuration exception

2018-10-10 Thread Romain Manni-Bucau
Hi Wout,

Maye check your classpath http client versions (against
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
for instance).

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le mer. 10 oct. 2018 à 15:37, Wout Scheepers <
wout.scheep...@vente-exclusive.com> a écrit :

> Hey JB,
>
> Thanks for your fast reply.
> The elastic version we're using is 5.6.2.
>
> "version": {
> "number": "5.6.2",
> "build_hash": "57e20f3",
> "build_date": "2017-09-23T13:16:45.703Z",
> "build_snapshot": false,
> "lucene_version": "6.6.1"
> }
>
>
> Wout
>
>
>
> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
>
> Hi Wout,
>
> what's the elasticsearch version ? (just to try to reproduce)
>
> Thanks,
> Regards
> JB
>
> On 10/10/2018 15:31, Wout Scheepers wrote:
> > Hey all,
> >
> >
> >
> > When using .withRetryConfiguration()for ElasticsearchIO, I get the
> > following stacktrace:
> >
> >
> >
> > Caused by:
> com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input
> >
> > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> > column: 0]
> >
> >at
> >
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
> >
> >
> >
> > I’ve been breaking my head on this one.
> >
> > Apparently the elastic Response object can’t be parsed anymore in the
> > checkForErrors() method.
> >
> > However, it is parsed successfully in the default RetryPredicate’s
> test
> > method, which is called in flushBatch() in the if clause related to
> the
> > retryConfig (ElasticsearchIO:1201).
> >
> > As far as I know, the Response object is not altered.
> >
> >
> >
> > Any clues why this doesn’t work for me?
> >
> > I really need this feature, as inserting 40M documents into elastic
> > results in too many retry timeouts ☺.
> >
> >
> >
> > Thanks!
> > Wout
> >
> >
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>


Re: Launching a subprocess in DoFn

2018-08-20 Thread Romain Manni-Bucau
It is surprising eclipse doesn't run with your user but yes, perms seem to
be the key here. I don't recall the details but in old packages of docker
you needed to add it to sudoers or manually add yourself in a docker group.

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le lun. 20 août 2018 à 17:24, Mahesh Vangala  a
écrit :

> Hi Romain -
>
> OKay, so I was running it through eclipse. I believe the jobs were running
> as eclipse user (?) as oppose to me (authorized to launch docker).
> Once, I ran project on command line (irrespective of sudo or not),
> containers executed just fine.
> Let me know if that makes sense.
> Thanks again.
>
> Regards,
> Mahesh
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Mon, Aug 20, 2018 at 11:18 AM Mahesh Vangala 
> wrote:
>
>> Hi Romain -
>>
>> Got it. Apparently, I needed to run beam as sudo to launch the docker
>> containers.
>> I still need to figure out why that's the case.
>> Thank you.
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Mon, Aug 20, 2018 at 11:05 AM Mahesh Vangala 
>> wrote:
>>
>>> Hello Romain -
>>>
>>> So, I have added pb.inheritIO().start().waitFor(); and now I have an
>>> error /bin/bash: docker: command not found.
>>> But, I have docker installed on the system. /usr/local/bin/docker
>>> Any ideas why I'm seeing this error when launched from within DoFn?
>>> Thank you so much for your help.
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>>
>>> On Mon, Aug 20, 2018 at 10:58 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>> Weird, this code works:
>>>>
>>>> https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e
>>>>
>>>> Are you sure your test_in.csv has some data (otherwise no DoFn
>>>> processing will be triggered)?
>>>>
>>>> Romain Manni-Bucau
>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>> <http://rmannibucau.wordpress.com> | Github
>>>> <https://github.com/rmannibucau> | LinkedIn
>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>
>>>>
>>>> Le lun. 20 août 2018 à 16:33, Mahesh Vangala 
>>>> a écrit :
>>>>
>>>>> Hi Romain -
>>>>>
>>>>> I don't see any errors when I used waitFor().
>>>>> However, I don't see those processes being executed either since
>>>>> "docker ps -a" doesn't list any processes.
>>>>> This is quite unrelated to beam itself normally. If your engine
>>>>> (spark, dataflow etc) doesn't have a security manager active
>>>>> I am using DirectRunner though.
>>>>> Let me know.
>>>>> Thank you!
>>>>>
>>>>> *--*
>>>>> *Mahesh Vangala*
>>>>> *(Ph) 443-326-1957*
>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>
>>>>>
>>>>> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> Hi Mahesh,
>>>>>>
>>>>>> Did you get the same error? This is quite unrelated to beam itself
>>>>>> normally. If your engine (spark, dataflow etc) doesn't have a security
>>>>>> manager active it should be enough, if it has you can be forbidden to use
>>>>>> that.
>>>>>>
>>>>>> Romain Manni-Bucau
>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>&

Re: Launching a subprocess in DoFn

2018-08-20 Thread Romain Manni-Bucau
Weird, this code works:

https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e

Are you sure your test_in.csv has some data (otherwise no DoFn processing
will be triggered)?

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le lun. 20 août 2018 à 16:33, Mahesh Vangala  a
écrit :

> Hi Romain -
>
> I don't see any errors when I used waitFor().
> However, I don't see those processes being executed either since "docker
> ps -a" doesn't list any processes.
> This is quite unrelated to beam itself normally. If your engine (spark,
> dataflow etc) doesn't have a security manager active
> I am using DirectRunner though.
> Let me know.
> Thank you!
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau 
> wrote:
>
>> Hi Mahesh,
>>
>> Did you get the same error? This is quite unrelated to beam itself
>> normally. If your engine (spark, dataflow etc) doesn't have a security
>> manager active it should be enough, if it has you can be forbidden to use
>> that.
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>>
>> Le lun. 20 août 2018 à 16:08, Mahesh Vangala 
>> a écrit :
>>
>>> Hello Romain -
>>>
>>> I did try that, still no luck.
>>> Also, when I put the process start logic into separate Test script, I do
>>> notice successful docker container when I do "docker ps".
>>> However, no such luck implementing that logic with in DoFn.
>>> Any thoughts?
>>> Thank you.
>>>
>>> Regards,
>>> Mahesh
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>>
>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>> waitFor and not java wait primitive?
>>>>
>>>> Le dim. 19 août 2018 04:35, Mahesh Vangala 
>>>> a écrit :
>>>>
>>>>> Hello Beamers -
>>>>>
>>>>> I am trying to pull a POC - launch docker image per element in Input
>>>>> PCollection and then return some data to Output Pcollection.
>>>>>
>>>>> Here is my code:
>>>>>
>>>>> public class VariantCaller
>>>>>
>>>>> {
>>>>>
>>>>> public static void main( String[] args )
>>>>>
>>>>> {
>>>>>
>>>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>> ).create();
>>>>>
>>>>> Pipeline p = Pipeline.create(opts);
>>>>>
>>>>> PCollection lines = p.apply(TextIO.read().from(
>>>>> "test_in.csv"));
>>>>>
>>>>> PCollection outLines = lines.apply(ParDo.of(new
>>>>> LaunchDocker.LaunchJobs()));
>>>>>
>>>>> PCollection mergedLines = outLines
>>>>> .apply(Combine.globally(new AddLines()));
>>>>>
>>>>> mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>
>>>>> p.run();
>>>>>
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> My LaunchDocker Code:
>>>>>
>>>>>
>>>>> public class LaunchDocker {
>>>>>
>>>>>   public static class LaunchJobs extends DoFn {
>>>>>
>>>>> private static final long serialVersionUID = 1L;
>>>>>
>>>>> private static final Logger LOG =
>>>>> LoggerFactory.getLogger(AddLines.class);
>>>>>
&g

Re: Launching a subprocess in DoFn

2018-08-20 Thread Romain Manni-Bucau
Hi Mahesh,

Did you get the same error? This is quite unrelated to beam itself
normally. If your engine (spark, dataflow etc) doesn't have a security
manager active it should be enough, if it has you can be forbidden to use
that.

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le lun. 20 août 2018 à 16:08, Mahesh Vangala  a
écrit :

> Hello Romain -
>
> I did try that, still no luck.
> Also, when I put the process start logic into separate Test script, I do
> notice successful docker container when I do "docker ps".
> However, no such luck implementing that logic with in DoFn.
> Any thoughts?
> Thank you.
>
> Regards,
> Mahesh
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau 
> wrote:
>
>> waitFor and not java wait primitive?
>>
>> Le dim. 19 août 2018 04:35, Mahesh Vangala  a
>> écrit :
>>
>>> Hello Beamers -
>>>
>>> I am trying to pull a POC - launch docker image per element in Input
>>> PCollection and then return some data to Output Pcollection.
>>>
>>> Here is my code:
>>>
>>> public class VariantCaller
>>>
>>> {
>>>
>>> public static void main( String[] args )
>>>
>>> {
>>>
>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>> ).create();
>>>
>>> Pipeline p = Pipeline.create(opts);
>>>
>>> PCollection lines = p.apply(TextIO.read().from(
>>> "test_in.csv"));
>>>
>>> PCollection outLines = lines.apply(ParDo.of(new
>>> LaunchDocker.LaunchJobs()));
>>>
>>> PCollection mergedLines = outLines
>>> .apply(Combine.globally(new AddLines()));
>>>
>>> mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>
>>> p.run();
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> My LaunchDocker Code:
>>>
>>>
>>> public class LaunchDocker {
>>>
>>>   public static class LaunchJobs extends DoFn {
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>> private static final Logger LOG = LoggerFactory.getLogger(AddLines.
>>> class);
>>>
>>> @ProcessElement
>>>
>>> public void processElement(ProcessContext c) throws Exception {
>>>
>>>   // Get the input element from ProcessContext.
>>>
>>>   String word = c.element().split(",")[0];
>>>
>>>   LOG.info(word);
>>>
>>>   ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>>
>>>   "docker run --rm ubuntu:16.04 sleep 20");
>>>
>>>pb.start().wait();
>>>
>>>   // Use ProcessContext.output to emit the output element.
>>>
>>>   if (!word.isEmpty())
>>>
>>> c.output(word + "\n");
>>>
>>> }
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> However, this fails with error:
>>>
>>>
>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>> getEstimatedSizeBytes
>>>
>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>
>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>>>
>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
>>> and produced 1 files and 9 bundles
>>>
>>> Aug 18, 2018 10:30:23 PM
>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>
>>> INFO: sample1
>>>
>>> Aug 18, 2018 10:30:23 PM
>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>
>>> INFO: 4
>>>
>>> Aug 18, 2018 10:30:23 PM
>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>
>>> INFO: 1
>>>
>>> Exception in thread "main"
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.IllegalMonitorStateException
>>>
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>> DirectRunner.java:332)
>>>
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>> DirectRunner.java:302)
>>>
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197
>>> )
>>>
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>>>
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>
>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>
>>> Caused by: java.lang.IllegalMonitorStateException
>>>
>>> at java.lang.Object.wait(Native Method)
>>>
>>> at java.lang.Object.wait(Object.java:502)
>>>
>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>>> LaunchDocker.java:19)
>>>
>>>
>>> Can you share your ideas what's the best way of achieving this?
>>>
>>> Thank you for your help!
>>>
>>>
>>> Sincerely,
>>>
>>> Mahesh
>>>
>>>
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>


Re: Launching a subprocess in DoFn

2018-08-19 Thread Romain Manni-Bucau
waitFor and not java wait primitive?

Le dim. 19 août 2018 04:35, Mahesh Vangala  a
écrit :

> Hello Beamers -
>
> I am trying to pull a POC - launch docker image per element in Input
> PCollection and then return some data to Output Pcollection.
>
> Here is my code:
>
> public class VariantCaller
>
> {
>
> public static void main( String[] args )
>
> {
>
> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
> ).create();
>
> Pipeline p = Pipeline.create(opts);
>
> PCollection lines = p.apply(TextIO.read().from(
> "test_in.csv"));
>
> PCollection outLines = lines.apply(ParDo.of(new
> LaunchDocker.LaunchJobs()));
>
> PCollection mergedLines = outLines.apply(Combine.globally(
> new AddLines()));
>
> mergedLines.apply(TextIO.write().to("test_out.csv"));
>
> p.run();
>
> }
>
> }
>
>
> My LaunchDocker Code:
>
>
> public class LaunchDocker {
>
>   public static class LaunchJobs extends DoFn {
>
> private static final long serialVersionUID = 1L;
>
> private static final Logger LOG = LoggerFactory.getLogger(AddLines.
> class);
>
> @ProcessElement
>
> public void processElement(ProcessContext c) throws Exception {
>
>   // Get the input element from ProcessContext.
>
>   String word = c.element().split(",")[0];
>
>   LOG.info(word);
>
>   ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>
>   "docker run --rm ubuntu:16.04 sleep 20");
>
>pb.start().wait();
>
>   // Use ProcessContext.output to emit the output element.
>
>   if (!word.isEmpty())
>
> c.output(word + "\n");
>
> }
>
>   }
>
> }
>
>
> However, this fails with error:
>
>
> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
> getEstimatedSizeBytes
>
> INFO: Filepattern test_in.csv matched 1 files with total size 36
>
> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>
> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
> and produced 1 files and 9 bundles
>
> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
> processElement
>
> INFO: sample1
>
> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
> processElement
>
> INFO: 4
>
> Aug 18, 2018 10:30:23 PM pipelines.variant_caller.LaunchDocker$LaunchJobs
> processElement
>
> INFO: 1
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalMonitorStateException
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:332)
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:302)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>
> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>
> Caused by: java.lang.IllegalMonitorStateException
>
> at java.lang.Object.wait(Native Method)
>
> at java.lang.Object.wait(Object.java:502)
>
> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
> LaunchDocker.java:19)
>
>
> Can you share your ideas what's the best way of achieving this?
>
> Thank you for your help!
>
>
> Sincerely,
>
> Mahesh
>
>
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com *
>


Re: FYI on Slack Channels

2018-06-26 Thread Romain Manni-Bucau
+1 sounds very good

side note: any channel must invite @asfarchivebot, I did it for the ones
before "etc" but if you add others please ensure it is done

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le mar. 26 juin 2018 à 01:05, Lukasz Cwik  a écrit :

> +user@beam.apache.org 
>
> On Mon, Jun 25, 2018 at 4:04 PM Rafael Fernandez 
> wrote:
>
>> Hello!
>>
>> I took the liberty to create area-specific channels (such as #beam-java,
>> #beam-python, #beam-go, etc.) As our project and community grows, I am
>> seeing more and more "organic" interest groups forming -- this may help us
>> chat more online. If they don't, we can delete later.
>>
>> Any thoughts? (I am having second thoughts... #beam-go should probably be
>> #beam-burrow ;p )
>>
>> Cheers,
>> r
>>
>


Re: Advice on parallelizing network calls in DoFn

2018-03-26 Thread Romain Manni-Bucau
Le 26 mars 2018 22:58, "Lukasz Cwik"  a écrit :

Since your fastCompletionStage implementation was just a pass through, how
do you expect to implement the other methods on the completion stage and
still not take a performance hit on the per element call since
the fastCompletionStage did not actually implement any of the other methods?


It would implement them. Just intended to show the api was more than ok and
perf dont justify to not use it.



On Sun, Mar 25, 2018 at 10:20 AM Romain Manni-Bucau 
wrote:

> @Lukasz: just a small precision on the bench I shared earlier: the
> overhead of CompletionStage (implemented with a "fast" flavor) is of < 7%
> if you ignore the usage of lambda (pass a function instance and not using
> lambda ref - not sure why the JVM doesn't handles it directly but since a
> JVM upgrade from the u40 to u144 made a 75% boost thanks to lambda+gc
> optims, I don't worry much of that part). Here are the raw results I get
> (Sharing beam one too since I used another computer):
> Comparison.beam thrpt5  184033706,109
> ± 31943851,553  ops/s
> Comparison.fastCompletionStageWithoutLambda thrpt5  171628984,800
> ±  2063217,863  ops/s
>
> I insist on the good fit of CompletionStage (or any reactive compatible
> API closer to java 9 maybe) but I had to migrate from a synchronous code to
> an async one on friday and the migration was not technically hard and
> brought a lot of benefit since now it can work in any environment
> (synchronous using toCompletionFuture().get() or asynchronous like akka
> actors bridging scala future and CompletionStage). For a portable API (I'm
> not speaking of the beam - language - portable API which is on top of
> runner from a design point of view) but of the API any runner must
> integrate with. Integrated with IO (which is the only part giving sense to
> any pipeline when you think about it) you can scala way more reliable and
> efficiently optimizing your resources so it would be an awesome fit for a
> solution like beam IMHO.
>
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-15 18:45 GMT+01:00 Jean-Baptiste Onofré :
>
>> By the way, you can take a look on JdbcIO which does a reshuffle
>> transform to avoid the "fusion" issue.
>>
>> Regards
>> JB
>> Le 15 mars 2018, à 10:44, Raghu Angadi  a écrit:
>>>
>>> In streaming, a simple way is to add a reshuffle to increase
>>> parallelism. When you are external-call bound, extra cost of reshuffle is
>>> negligible. e.g.  https://stackoverflow.com/questions/46116443/dataflow-
>>> streaming-job-not-scaleing-past-1-worker
>>>
>>> Note that by default Dataflow workers use a couple of hundred threads as
>>> required. This can be increased with a pipeline option if you prefer. I am
>>> not sure of other runners.
>>>
>>> On Thu, Mar 15, 2018 at 8:25 AM Falcon Taylor-Carter <
>>> fal...@bounceexchange.com> wrote:
>>>
>>>> Hello Pablo,
>>>>
>>>> Thanks for checking up (I'm working with Josh on this problem). It
>>>> seems there isn't a built-in process for this kind of use case currently,
>>>> and that the best process right now is to handle our own bundling and
>>>> threading in the DoFn. If you had any other suggestions, or anything to
>>>> keep in mind in doing this, let us know!
>>>>
>>>> Falcon
>>>>
>>>> On Tue, Mar 13, 2018 at 4:52 PM, Pablo Estrada 
>>>> wrote:
>>>>
>>>>> I'd just like to close the loop. Josh, did you get an answer/guidance
>>>>> on how to proceed with your pipeline?
>>>>> Or maybe we'll need a new thread to figure that out : )
>>>>> Best
>>>>> -P.
>>>>>
>>>>>
>>>>> On Fri, Mar 9, 2018 at 1:39 PM Josh Ferge <
>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>
>>>>>> Hello all:
>>>>>>
>>>>>> Our team has a pipeline that make external network calls. These
>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>> slow because we are not threading for our ne

Re: Advice on parallelizing network calls in DoFn

2018-03-25 Thread Romain Manni-Bucau
@Lukasz: just a small precision on the bench I shared earlier: the overhead
of CompletionStage (implemented with a "fast" flavor) is of < 7% if you
ignore the usage of lambda (pass a function instance and not using lambda
ref - not sure why the JVM doesn't handles it directly but since a JVM
upgrade from the u40 to u144 made a 75% boost thanks to lambda+gc optims, I
don't worry much of that part). Here are the raw results I get (Sharing
beam one too since I used another computer):
Comparison.beam thrpt5  184033706,109 ±
31943851,553  ops/s
Comparison.fastCompletionStageWithoutLambda thrpt5  171628984,800
±  2063217,863  ops/s

I insist on the good fit of CompletionStage (or any reactive compatible API
closer to java 9 maybe) but I had to migrate from a synchronous code to an
async one on friday and the migration was not technically hard and brought
a lot of benefit since now it can work in any environment (synchronous
using toCompletionFuture().get() or asynchronous like akka actors bridging
scala future and CompletionStage). For a portable API (I'm not speaking of
the beam - language - portable API which is on top of runner from a design
point of view) but of the API any runner must integrate with. Integrated
with IO (which is the only part giving sense to any pipeline when you think
about it) you can scala way more reliable and efficiently optimizing your
resources so it would be an awesome fit for a solution like beam IMHO.



Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-15 18:45 GMT+01:00 Jean-Baptiste Onofré :

> By the way, you can take a look on JdbcIO which does a reshuffle transform
> to avoid the "fusion" issue.
>
> Regards
> JB
> Le 15 mars 2018, à 10:44, Raghu Angadi  a écrit:
>>
>> In streaming, a simple way is to add a reshuffle to increase parallelism.
>> When you are external-call bound, extra cost of reshuffle is negligible.
>> e.g.  https://stackoverflow.com/questions/46116443/dataflow-
>> streaming-job-not-scaleing-past-1-worker
>>
>> Note that by default Dataflow workers use a couple of hundred threads as
>> required. This can be increased with a pipeline option if you prefer. I am
>> not sure of other runners.
>>
>> On Thu, Mar 15, 2018 at 8:25 AM Falcon Taylor-Carter <
>> fal...@bounceexchange.com> wrote:
>>
>>> Hello Pablo,
>>>
>>> Thanks for checking up (I'm working with Josh on this problem). It seems
>>> there isn't a built-in process for this kind of use case currently, and
>>> that the best process right now is to handle our own bundling and threading
>>> in the DoFn. If you had any other suggestions, or anything to keep in mind
>>> in doing this, let us know!
>>>
>>> Falcon
>>>
>>> On Tue, Mar 13, 2018 at 4:52 PM, Pablo Estrada 
>>> wrote:
>>>
>>>> I'd just like to close the loop. Josh, did you get an answer/guidance
>>>> on how to proceed with your pipeline?
>>>> Or maybe we'll need a new thread to figure that out : )
>>>> Best
>>>> -P.
>>>>
>>>>
>>>> On Fri, Mar 9, 2018 at 1:39 PM Josh Ferge <
>>>> josh.fe...@bounceexchange.com> wrote:
>>>>
>>>>> Hello all:
>>>>>
>>>>> Our team has a pipeline that make external network calls. These
>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>> slow because we are not threading for our network calls. The github issue
>>>>> below provides some discussion around this:
>>>>>
>>>>> https://github.com/apache/beam/pull/957
>>>>>
>>>>> In beam 1.0, there was IntraBundleParallelization, which helped with
>>>>> this. However, this was removed because it didn't comply with a few BEAM
>>>>> paradigms.
>>>>>
>>>>> Questions going forward:
>>>>>
>>>>> What is advised for jobs that make blocking network calls? It seems
>>>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>>>> and managing the threading within the function might work. thoughts?
>>>>> Are these types of jobs even suitable for beam?
>>>>> Are there any plans to develop features that help with this?
>>>>>
>>>>> Thanks
>>>>>
>>>> --
>>>> Got feedback? go/pabloem-feedback
>>>> <https://goto.google.com/pabloem-feedback>
>>>>
>>>
>>>


Re: [ANNOUNCE] Apache Beam 2.4.0 released

2018-03-22 Thread Romain Manni-Bucau
congrats guys


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-22 9:50 GMT+01:00 Etienne Chauchot :

> Great !
> Le jeudi 22 mars 2018 à 08:24 +, Robert Bradshaw a écrit :
>
> We are pleased to announce the release of Apache Beam 2.4.0. Thanks goes to
> the many people who made this possible.
>
> Apache Beam is an open source unified programming model to define and
> execute data processing pipelines, including ETL, batch and stream
> (continuous) processing. See https://beam.apache.org
>
> You can download the release here:
>
>  https://beam.apache.org/get-started/downloads/
>
> As well as many bugfixes, some notable changes in this release are:
> - A new Python Direct runner, up to 15x faster than the old one.
> - Kinesis support for reading and writing in Java
> - Several refactoring to enable portability (Go/Python on Flink/Spark)
>
> Full release notes can be found at
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12342682&projectId=12319527
>
> Enjoy!
>
>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Romain Manni-Bucau
side note: try to do a thread dump on the workers maybe


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-14 20:21 GMT+01:00 Eugene Kirpichov :

> "Jdbcio will create for each prepared statement new connection" - this is
> not the case: the connection is created in @Setup and deleted in @Teardown.
> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
> https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/
> jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
>
> Something else must be going wrong.
>
> On Wed, Mar 14, 2018 at 12:11 PM Aleksandr  wrote:
>
>> Hello, we had similar problem. Current jdbcio will cause alot of
>> connection errors.
>>
>> Typically you have more than one prepared statement. Jdbcio will create
>> for each prepared statement new connection(and close only in teardown) So
>> it is possible that connection will get timeot or in case in case of auto
>> scaling you will get to many connections to sql.
>> Our solution was to create connection pool in setup and get connection
>> and return back to pool in processElement.
>>
>> Best Regards,
>> Aleksandr Gortujev.
>>
>> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
>> j...@nanthrax.net>:
>>
>> Agree especially using the current JdbcIO impl that creates connection in
>> the @Setup. Or it means that @Teardown is never called ?
>>
>> Regards
>> JB
>> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a
>> écrit:
>>>
>>> Hi Derek - could you explain where does the "3000 connections" number
>>> come from, i.e. how did you measure it? It's weird that 5-6 workers would
>>> use 3000 connections.
>>>
>>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>>>
>>>> Hi,
>>>>
>>>> We are new to Beam and need some help.
>>>>
>>>> We are working on a flow to ingest events and writes the aggregated
>>>> counts to a database. The input rate is rather low (~2000 message per
>>>> sec), but the processing is relatively heavy, that we need to scale out
>>>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
>>>> low. But because of the number of workers, it keeps 3000 connections to
>>>> the database and it keeps hitting the database connection limits.
>>>>
>>>> Is there a way that we can reduce the concurrency only at the output
>>>> stage? (In Spark we would have done a repartition/coalesce).
>>>>
>>>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
>>>> Dataflow.
>>>>
>>>> Thank you in advance!
>>>>
>>>>
>>>>
>>>>
>>


Re: Reducing database connection with JdbcIO

2018-03-14 Thread Romain Manni-Bucau
A pool would only make sense if you can get a singleton for the
JVM/datasource (not even the pipeline on this one) - there is a disucssion
on that on dev@ more generally than just on IO.
A pool of size one without any validation config is like having a single
connection you reuse for each bundle if it is still open - but it requires
a new jar ;).
Think the validation strategy can make sense and limiting the concurrency
as well since RDBMS will not behave better with hundreds of clients than
some dozens.

The current workaround can be to set a datasource supplier which will use a
pool correctly configured which is set from a singleton in your app code
probably.



Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-14 20:11 GMT+01:00 Aleksandr :

> Hello, we had similar problem. Current jdbcio will cause alot of
> connection errors.
>
> Typically you have more than one prepared statement. Jdbcio will create
> for each prepared statement new connection(and close only in teardown) So
> it is possible that connection will get timeot or in case in case of auto
> scaling you will get to many connections to sql.
> Our solution was to create connection pool in setup and get connection and
> return back to pool in processElement.
>
> Best Regards,
> Aleksandr Gortujev.
>
> 14. märts 2018 8:52 PM kirjutas kuupäeval "Jean-Baptiste Onofré" <
> j...@nanthrax.net>:
>
> Agree especially using the current JdbcIO impl that creates connection in
> the @Setup. Or it means that @Teardown is never called ?
>
> Regards
> JB
> Le 14 mars 2018, à 11:40, Eugene Kirpichov  a écrit:
>>
>> Hi Derek - could you explain where does the "3000 connections" number
>> come from, i.e. how did you measure it? It's weird that 5-6 workers would
>> use 3000 connections.
>>
>> On Wed, Mar 14, 2018 at 3:50 AM Derek Chan  wrote:
>>
>>> Hi,
>>>
>>> We are new to Beam and need some help.
>>>
>>> We are working on a flow to ingest events and writes the aggregated
>>> counts to a database. The input rate is rather low (~2000 message per
>>> sec), but the processing is relatively heavy, that we need to scale out
>>> to 5~6 nodes. The output (via JDBC) is aggregated, so the volume is also
>>> low. But because of the number of workers, it keeps 3000 connections to
>>> the database and it keeps hitting the database connection limits.
>>>
>>> Is there a way that we can reduce the concurrency only at the output
>>> stage? (In Spark we would have done a repartition/coalesce).
>>>
>>> And, if it matters, we are using Apache Beam 2.2 via Scio, on Google
>>> Dataflow.
>>>
>>> Thank you in advance!
>>>
>>>
>>>
>>>
>


Re: Advice on parallelizing network calls in DoFn

2018-03-09 Thread Romain Manni-Bucau
@Kenn: why not preferring to make beam reactive? Would alow to scale way
more without having to hardly synchronize multithreading. Elegant and
efficient :). Beam 3?


Le 9 mars 2018 22:49, "Kenneth Knowles"  a écrit :

> I will start with the "exciting futuristic" answer, which is that we
> envision the new DoFn to be able to provide an automatic ExecutorService
> parameters that you can use as you wish.
>
> new DoFn<>() {
>   @ProcessElement
>   public void process(ProcessContext ctx, ExecutorService
> executorService) {
>   ... launch some futures, put them in instance vars ...
>   }
>
>   @FinishBundle
>   public void finish(...) {
>  ... block on futures, output results if appropriate ...
>   }
> }
>
> This way, the Java SDK harness can use its overarching knowledge of what
> is going on in a computation to, for example, share a thread pool between
> different bits. This was one reason to delete IntraBundleParallelization -
> it didn't allow the runner and user code to properly manage how many things
> were going on concurrently. And mostly the runner should own parallelizing
> to max out cores and what user code needs is asynchrony hooks that can
> interact with that. However, this feature is not thoroughly considered. TBD
> how much the harness itself manages blocking on outstanding requests versus
> it being your responsibility in FinishBundle, etc.
>
> I haven't explored rolling your own here, if you are willing to do the
> knob tuning to get the threading acceptable for your particular use case.
> Perhaps someone else can weigh in.
>
> Kenn
>
> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge 
> wrote:
>
>> Hello all:
>>
>> Our team has a pipeline that make external network calls. These pipelines
>> are currently super slow, and the hypothesis is that they are slow because
>> we are not threading for our network calls. The github issue below provides
>> some discussion around this:
>>
>> https://github.com/apache/beam/pull/957
>>
>> In beam 1.0, there was IntraBundleParallelization, which helped with
>> this. However, this was removed because it didn't comply with a few BEAM
>> paradigms.
>>
>> Questions going forward:
>>
>> What is advised for jobs that make blocking network calls? It seems
>> bundling the elements into groups of size X prior to passing to the DoFn,
>> and managing the threading within the function might work. thoughts?
>> Are these types of jobs even suitable for beam?
>> Are there any plans to develop features that help with this?
>>
>> Thanks
>>
>


Re: [INFO] Spark runner updated to Spark 2.2.1

2017-12-18 Thread Romain Manni-Bucau
Congrats, was waited for a long time! Very impatient to see the
announcement of the 2.3!


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau>

2017-12-18 16:55 GMT+01:00 Jean-Baptiste Onofré :

> By the way, Flink has been updated to Flink 1.4.0 as well (as the
> artifacts already used Scala 2.11).
>
> Regards
> JB
>
> On 12/18/2017 11:50 AM, Jean-Baptiste Onofré wrote:
>
>> Hi all,
>>
>> We are pleased to announce that Spark 2.x support in Spark runner has
>> been merged this morning. It supports Spark 2.2.1.
>>
>> In the same PR, we did update to Scala 2.11, including Flink artifacts
>> update to 2.11 (it means it's already ready to upgrade to Flink 1.4 !).
>>
>> It also means, as planned, that Spark 2.x support will be included in
>> next Beam 2.3.0 release.
>>
>> Now, we are going to work on improvements in the Spark runner.
>>
>> If you have any issue with the Spark runner, please let us know.
>>
>> Thanks !
>> Regards
>> JB
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Spark and Beam

2017-09-27 Thread Romain Manni-Bucau
You have the SparkPipelineOptions you can set
(org.apache.beam.runners.spark.SparkPipelineOptions -
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java),
most of these options are explained at
https://beam.apache.org/documentation/runners/spark/. You can add custom
PipelineOptions if you want to use them from your transforms:
https://beam.apache.org/documentation/programming-guide/#setting-pipelineoptions-from-command-line-arguments
.

Hope it helps


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://blog-rmannibucau.rhcloud.com> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
<https://javaeefactory-rmannibucau.rhcloud.com>

2017-09-27 12:52 GMT+02:00 tal m :

> my last email want't clear, please ignore.
> Thanks it's looks better (no error or exceptions)
> my problem now is how to set Spark conf to my pipeline, this is what i
> have ?
>
> onf = SparkConf();
>  conf.setAppName("InsightEdge Python Example")
>  conf.set("my.field1", "XXX")
>
>  conf.set("my.field2", "YYY")
>
> how can i send it to my pipeline, i guess within command line ?
>
> Thanks Tal
>
>
> On Wed, Sep 27, 2017 at 1:45 PM, tal m  wrote:
>
>> Thanks it's looks better (no error or exceptions)
>> my problem now is how to set Spark conf to my pipeline, this is what i
>> have ?
>>
>> onf = SparkConf();
>>  conf.setAppName("InsightEdge Python Example")
>>  conf.set("spark.insightedge.space.name", "insightedge-space")
>>  conf.set("spark.insightedge.space.lookup.group", "insightedge")
>>  conf.set("spark.insightedge.space.lookup.locator", "127.0.0.1:4174")
>>
>>
>>
>> On Tue, Sep 26, 2017 at 3:46 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>> Yes, you need a coder for your product if it is passed to the output for
>>> next "apply". You can register it on the pipeline or through beam SPI. Here
>>> is a sample to use java serialization:
>>>
>>> pipeline.getCoderRegistry().registerCoderForClass(Product.class, 
>>> SerializableCoder.of(Product.class));
>>>
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://blog-rmannibucau.rhcloud.com> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
>>> <https://javaeefactory-rmannibucau.rhcloud.com>
>>>
>>> 2017-09-26 14:37 GMT+02:00 tal m :
>>>
>>>> hi
>>>> i tried what you wrote the argumants that i use are:
>>>> --sparkMaster=local --runner=SparkRunner
>>>> i already have Spark running.
>>>> now i'm getting the following error:
>>>>
>>>>
>>>> .IllegalStateException: Unable to return a default Coder for
>>>> ParDo(Anonymous)/ParMultiDo(Anonymous).out0 [PCollectio
>>>>
>>>>
>>>> Thanks Tal
>>>>
>>>>
>>>>
>>>> On Tue, Sep 26, 2017 at 12:32 PM, Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>> Hi Tal,
>>>>>
>>>>> Did you try something like that:
>>>>>
>>>>> public static void main(final String[] args) {
>>>>> final Pipeline pipeline = 
>>>>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
>>>>>
>>>>> pipeline.apply(GenerateSequence.from(0).to(10L))
>>>>> .apply(ParDo.of(new DoFn() {
>>>>> @ProcessElement
>>>>> public void onElement(final ProcessContext context) {
>>>>> final int i = context.element();
>>>>> context.output(new Product(i, "Product #" + i));
>>>>> }
>>>>> }));
>>>>>
>>>>> pipeline.run();
>>>>> }
>>>>>
>>>>>
>>>>> Then it is just a matter of having the beam dependency matching your 
>>>>> runner (target environment). For testing the direct run

Re: Spark and Beam

2017-09-26 Thread Romain Manni-Bucau
Yes, you need a coder for your product if it is passed to the output for
next "apply". You can register it on the pipeline or through beam SPI. Here
is a sample to use java serialization:

pipeline.getCoderRegistry().registerCoderForClass(Product.class,
SerializableCoder.of(Product.class));



Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://blog-rmannibucau.rhcloud.com> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
<https://javaeefactory-rmannibucau.rhcloud.com>

2017-09-26 14:37 GMT+02:00 tal m :

> hi
> i tried what you wrote the argumants that i use are:
> --sparkMaster=local --runner=SparkRunner
> i already have Spark running.
> now i'm getting the following error:
>
>
> .IllegalStateException: Unable to return a default Coder for
> ParDo(Anonymous)/ParMultiDo(Anonymous).out0 [PCollectio
>
>
> Thanks Tal
>
>
>
> On Tue, Sep 26, 2017 at 12:32 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>> Hi Tal,
>>
>> Did you try something like that:
>>
>> public static void main(final String[] args) {
>> final Pipeline pipeline = 
>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
>>
>> pipeline.apply(GenerateSequence.from(0).to(10L))
>> .apply(ParDo.of(new DoFn() {
>> @ProcessElement
>> public void onElement(final ProcessContext context) {
>> final int i = context.element();
>> context.output(new Product(i, "Product #" + i));
>> }
>> }));
>>
>> pipeline.run();
>> }
>>
>>
>> Then it is just a matter of having the beam dependency matching your runner 
>> (target environment). For testing the direct runner is enough but to run on 
>> spark you will need to import the spark one as dependency.
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://blog-rmannibucau.rhcloud.com> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
>> <https://javaeefactory-rmannibucau.rhcloud.com>
>>
>> 2017-09-26 11:02 GMT+02:00 tal m :
>>
>>> HI
>>> i looked at the links you sent me, and i haven't found any clue how to
>>> adapt it to my current code.
>>> my code is very simple:
>>>
>>> val sc = spark.sparkContext
>>>
>>> val productsNum = 10
>>> println(s"Saving $productsNum products RDD to the space")
>>> val rdd = sc.parallelize(1 to productsNum).map { i =>
>>>   Product(i, "Description of product " + i, Random.nextInt(10), 
>>> Random.nextBoolean())
>>> }
>>>
>>> is that simple to use beam instead of SparkContext ? i'm not familiar with 
>>> Spark at all so i have no idea what is Spark runner and how can i use it in 
>>> my case, just need to make it work :).
>>>
>>> Thanks Tal
>>>
>>>
>>> On Tue, Sep 26, 2017 at 11:57 AM, Aviem Zur  wrote:
>>>
>>>> Hi Tal,
>>>>
>>>> Thanks for reaching out!
>>>>
>>>> Please take a look at our documentation:
>>>>
>>>> Quickstart guide (Java): https://beam.apache.or
>>>> g/get-started/quickstart-java/
>>>> This guide will show you how to run our wordcount example using each
>>>> any of the runners (For example, direct runner or Spark runner in your
>>>> case).
>>>>
>>>> More reading:
>>>> Programming guide: https://beam.apache.org
>>>> /documentation/programming-guide/
>>>> Spark runner: https://beam.apache.org/documentation/runners/spark/
>>>>
>>>> Please let us know if you have further questions, and good luck with
>>>> your first try of Beam!
>>>>
>>>> Aviem.
>>>>
>>>> On Tue, Sep 26, 2017 at 11:47 AM tal m  wrote:
>>>>
>>>>> hi
>>>>> i'm new at Spark and also at beam.
>>>>> currently i have Java code that use Spark from reading some data from
>>>>> DB.
>>>>> my Spark code using SparkSession.builder (.) and also sparkContext.
>>>>> how can i make beam work similar to my current code, i just want make
>>>>> it work for now.
>>>>> Thanks Tal
>>>>>
>>>>
>>>
>>
>


Re: Spark and Beam

2017-09-26 Thread Romain Manni-Bucau
Hi Tal,

Did you try something like that:

public static void main(final String[] args) {
final Pipeline pipeline =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());

pipeline.apply(GenerateSequence.from(0).to(10L))
.apply(ParDo.of(new DoFn() {
@ProcessElement
public void onElement(final ProcessContext context) {
final int i = context.element();
context.output(new Product(i, "Product #" + i));
}
}));

pipeline.run();
}


Then it is just a matter of having the beam dependency matching your
runner (target environment). For testing the direct runner is enough
but to run on spark you will need to import the spark one as
dependency.



Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://blog-rmannibucau.rhcloud.com> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
<https://javaeefactory-rmannibucau.rhcloud.com>

2017-09-26 11:02 GMT+02:00 tal m :

> HI
> i looked at the links you sent me, and i haven't found any clue how to
> adapt it to my current code.
> my code is very simple:
>
> val sc = spark.sparkContext
>
> val productsNum = 10
> println(s"Saving $productsNum products RDD to the space")
> val rdd = sc.parallelize(1 to productsNum).map { i =>
>   Product(i, "Description of product " + i, Random.nextInt(10), 
> Random.nextBoolean())
> }
>
> is that simple to use beam instead of SparkContext ? i'm not familiar with 
> Spark at all so i have no idea what is Spark runner and how can i use it in 
> my case, just need to make it work :).
>
> Thanks Tal
>
>
> On Tue, Sep 26, 2017 at 11:57 AM, Aviem Zur  wrote:
>
>> Hi Tal,
>>
>> Thanks for reaching out!
>>
>> Please take a look at our documentation:
>>
>> Quickstart guide (Java): https://beam.apache.or
>> g/get-started/quickstart-java/
>> This guide will show you how to run our wordcount example using each any
>> of the runners (For example, direct runner or Spark runner in your case).
>>
>> More reading:
>> Programming guide: https://beam.apache.org/documentation/programming-gui
>> de/
>> Spark runner: https://beam.apache.org/documentation/runners/spark/
>>
>> Please let us know if you have further questions, and good luck with your
>> first try of Beam!
>>
>> Aviem.
>>
>> On Tue, Sep 26, 2017 at 11:47 AM tal m  wrote:
>>
>>> hi
>>> i'm new at Spark and also at beam.
>>> currently i have Java code that use Spark from reading some data from DB.
>>> my Spark code using SparkSession.builder (.) and also sparkContext.
>>> how can i make beam work similar to my current code, i just want make it
>>> work for now.
>>> Thanks Tal
>>>
>>
>


Re: Make runner implementation smoother?

2017-05-25 Thread Romain Manni-Bucau
2017-05-25 7:42 GMT+02:00 Jean-Baptiste Onofré :

> Hi Romain,
>
> 1. A runner has to implement the 5 (or 6 depending how we count)
> primitives. It's the purpose of Runner API (the Fn API is more to deal with
> SDKs in the runner). So, the Runner API is kind of adapter, and it
> implements all other functions based on the primitives. You would like a
> formal RunnerAdapter in the Runner API ?
>

Yes, or at least change the communication in conferences ;) ("have to
implement" vs "have to handle").


>
> 2. A PipelineVisitor allows you to browse the DAG and get the
> PInput/POutput (representing PCollection and side inputs/outputs) for each
> PTransforms/steps in the pipeline. You can use this PipelineVisitor in a
> pre-step to get the complete graph. I see in your code that you use it.
> What would you like instead ? Kind of "object" representing the DAG ?
>

There I was wondering why the visitor was not more typed:

void onParDoFn(in, out)
void onGroupByKey(in, out)
etc...

and probably in a base class the helpers we have in the core construction
(SimpleDoFn) to make the implementation as smooth as possible



>
> I gonna take a look on the code. FYI, I also have PoC for Hazelcast (with
> Cellar), Ignite and MapReduce if you want to take a look.
>

Sure! Think Jet makes sense since it is the "official" hazelcast data
processing solution but shouldn't be that far :).


>
> Regards
> JB
>
> On 05/24/2017 10:54 PM, Romain Manni-Bucau wrote:
>
>> Hi guys,
>>
>> congrats for the 2.0!
>>
>> I have a few question regarding a custom runner implementation, there is
>> no particular order but adding numbers for later references:
>>
>> 1. why beam doesn't have (yet?) a RunnerAdapter with all primivite listed
>> and enforced by API, if I got right the code (shout if not) each runner is
>> creating its own processing context and convertion at the moment which kind
>> of means beam doesn't abstract the runtime which makes it harder to enter
>> into beam model IMHO (vs defining by contract all operations - potentially
>> with defaults when compositions are possible)
>> 2. close to previous point (still around runner): i find quite hard to
>> browse the DAG of beam compared to a plain DAG, is it intended to be so low
>> level, can't we get a simple graph model?
>>
>> Maybe I'm just hitting a not yet extended/defined land and therefore an
>> user friendly API is missing or I missed a central concept - in this case
>> shout :p.
>>
>> Any pointers would be very welcomed on how to implement a runner without
>> redefining a full transpiler/converter - or is it outside beam scope?
>>
>>
>> FYI here what I have https://github.com/rmannibucau/beam-hazelcast-runner
>> and here where I'm stucked (hesitating to redefine a full transpiler since
>> I expected beam to help): https://github.com/rmannibucau
>> /beam-hazelcast-runner/blob/master/src/main/java/com/
>> github/rmannibucau/beam/runner/hazelcast/HazelcastPipelineVisitor.java
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> | Blog <
>> https://blog-rmannibucau.rhcloud.com> | Old Blog <
>> http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibuca
>> u> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
>> <https://javaeefactory-rmannibucau.rhcloud.com>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: question about running examples in command

2017-05-24 Thread Romain Manni-Bucau
Hi

did you try from examples/java:

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount
-Dexec.args="--output=/tmp/out.txt"




Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://blog-rmannibucau.rhcloud.com> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
<https://javaeefactory-rmannibucau.rhcloud.com>

2017-05-24 22:59 GMT+02:00 Claire Yuan :

> Hi,
>   Would anyone please show the correct command to run the examples like
> wordcount and tfIdf with maven in terminal?
>


Make runner implementation smoother?

2017-05-24 Thread Romain Manni-Bucau
Hi guys,

congrats for the 2.0!

I have a few question regarding a custom runner implementation, there is no
particular order but adding numbers for later references:

1. why beam doesn't have (yet?) a RunnerAdapter with all primivite listed
and enforced by API, if I got right the code (shout if not) each runner is
creating its own processing context and convertion at the moment which kind
of means beam doesn't abstract the runtime which makes it harder to enter
into beam model IMHO (vs defining by contract all operations - potentially
with defaults when compositions are possible)
2. close to previous point (still around runner): i find quite hard to
browse the DAG of beam compared to a plain DAG, is it intended to be so low
level, can't we get a simple graph model?

Maybe I'm just hitting a not yet extended/defined land and therefore an
user friendly API is missing or I missed a central concept - in this case
shout :p.

Any pointers would be very welcomed on how to implement a runner without
redefining a full transpiler/converter - or is it outside beam scope?


FYI here what I have https://github.com/rmannibucau/beam-hazelcast-runner
and here where I'm stucked (hesitating to redefine a full transpiler since
I expected beam to help):
https://github.com/rmannibucau/beam-hazelcast-runner/blob/master/src/main/java/com/github/rmannibucau/beam/runner/hazelcast/HazelcastPipelineVisitor.java

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://blog-rmannibucau.rhcloud.com> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
<https://javaeefactory-rmannibucau.rhcloud.com>