Spark Structured Streaming runner migrated to Spark 3

2021-08-05 Thread Etienne Chauchot

Hi all,

Just to let you know that Spark Structured Streaming runner was migrated 
to Spark 3.


Enjoy !

Etienne



Re: Spark Structured Streaming Runner Roadmap

2021-08-03 Thread Etienne Chauchot

Hi,

Sorry for the late answer: the streaming mode in spark structured 
streaming runner is stuck because of spark structured streaming 
framework implementation of watermark at the apache spark project side. See


https://echauchot.blogspot.com/2020/11/watermark-architecture-proposal-for.html

best

Etienne

On 20/05/2021 20:37, Yu Zhang wrote:

Hi Beam Community,

Would there be any roadmap for Spark Structured Runner to support streaming and 
Splittable DoFn API? Like the specific timeline or release version.

Thanks,
Yu


Re: A new reworked Elasticsearch 7+ IO module

2020-03-31 Thread Etienne Chauchot

Hi all,

The survey regarding Elasticsearch support in Beam is now closed.

Here are the results after 38 days:

users using

ESv2: 0

ESV5: 1

ESV6: 5

ESV7: 8

So, the new version of ElasticsearchIO after the refactoring discussed 
in this thread will no more support Elasticsearch v2.


Regards

Etienne Chauchot.


On 06/03/2020 11:26, Etienne Chauchot wrote:


Hi all,

it's been 3 weeks since the survey on ES versions the users use.

The survey received very few responses: only 9 responses for now 
(multiple versions possible of course). The responses are the following:


ES2: 0 clients, ES5: 1, ES6: 5, ES7: 8

It tends to go toward a drop of ES2 support but for now it is still 
not very representative.


I'm cross-posting to @users to let you know that I'm closing the 
survey within 1 or 2 weeks. So please respond if you're using ESIO.


Best

Etienne

On 13/02/2020 12:37, Etienne Chauchot wrote:


Hi Cham, thanks for your comments !

I just sent an email to user ML with a survey link to count ES uses 
per version:


https://lists.apache.org/thread.html/rc8185afb8af86a2a032909c13f569e18bd89e75a5839894d5b5d4082%40%3Cuser.beam.apache.org%3E

Best

Etienne

On 10/02/2020 19:46, Chamikara Jayalath wrote:



On Thu, Feb 6, 2020 at 8:13 AM Etienne Chauchot 
mailto:echauc...@apache.org>> wrote:


Hi,

please see my comments inline

On 06/02/2020 16:24, Alexey Romanenko wrote:

Please, see my comments inline.


On 6 Feb 2020, at 10:50, Etienne Chauchot
mailto:echauc...@apache.org>> wrote:



1. regarding version support: ES v2 is no more
maintained by Elastic since 2018/02 so we plan to
remove it from the IO. In the past we already
retired versions (like spark 1.6 for instance).



My only concern here is that there might be users who use
the existing module who might not be able to easily
upgrade the Beam version if we remove it. But given that
V2 is 5 versions behind the latest release this might be OK.


It seems we have a consensus on this.
I think there should be another general discussion on the
long term support of our prefered tool IO modules.


=> yes, consensus, let's drop ESV2


We had (and still have) a similar problem with KafkaIO to
support different versions of Kafka, especially very old
version 0.9. We raised this question on user@ and it appears
that there are users who for some reasons still use old Kafka
versions. So, before dropping a support of any ES versions, I’d
suggest to ask it user@ and see if any people will be affected
by this.

Yes we can do a survey among users but the question is, should
we support an ES version that is no more supported by Elastic
themselves ?


+1 for asking in the user list. I guess this is more about whether 
users need this specific version that we hope to drop support for. 
Whether we need to support unsupported versions is a more generic 
question that should prob. be addressed in the dev list. (and I 
personally don't think we should unless there's a large enough user 
base for a given version).



2. regarding the user: the aim is to unlock some
new features (listed by Ludovic) and give the user
more flexibility on his request. For that, it
requires to use high level java ES client in place
of the low level REST client (that was used because
it is the only one compatible with all ES
versions). We plan to replace the API (json
document in and out) by more complete standard ES
objects that contain de request logic
(insert/update, doc routing etc...) and the data.
There are already IOs like SpannerIO that use
similar objects in input PCollection rather than
pure POJOs.



Won't this be a breaking change for all users ? IMO using
POJOs in PCollections is safer since we have to worry
about changes to the underlying client library API.
Exception would be when underlying client library offers
a backwards compatibility guarantee that we can rely on
for the foreseeable future (for example, BQ TableRow).


Agreed but actually, there will be POJOs in order to abstract
Elasticsearch's version support. The following third point
explains this.


=> indeed it will be a breaking change, hence this email to
get a consensus on that. Also I think our wrappers of ES
request objects will offer a backward compatible as the
underlying objects


I just want to remind that according to what we agreed some
time ago on dev@ (at least, for IOs), all breaking user API
changes have to be added along with deprecation of old API that
could be removed after 3 consecutive Beam releases. In this
case, users will have a time to move to new API smoothly.


We a

Re: A new reworked Elasticsearch 7+ IO module

2020-03-06 Thread Etienne Chauchot

Hi all,

it's been 3 weeks since the survey on ES versions the users use.

The survey received very few responses: only 9 responses for now 
(multiple versions possible of course). The responses are the following:


ES2: 0 clients, ES5: 1, ES6: 5, ES7: 8

It tends to go toward a drop of ES2 support but for now it is still not 
very representative.


I'm cross-posting to @users to let you know that I'm closing the survey 
within 1 or 2 weeks. So please respond if you're using ESIO.


Best

Etienne

On 13/02/2020 12:37, Etienne Chauchot wrote:


Hi Cham, thanks for your comments !

I just sent an email to user ML with a survey link to count ES uses 
per version:


https://lists.apache.org/thread.html/rc8185afb8af86a2a032909c13f569e18bd89e75a5839894d5b5d4082%40%3Cuser.beam.apache.org%3E

Best

Etienne

On 10/02/2020 19:46, Chamikara Jayalath wrote:



On Thu, Feb 6, 2020 at 8:13 AM Etienne Chauchot <mailto:echauc...@apache.org>> wrote:


Hi,

please see my comments inline

On 06/02/2020 16:24, Alexey Romanenko wrote:

Please, see my comments inline.


On 6 Feb 2020, at 10:50, Etienne Chauchot mailto:echauc...@apache.org>> wrote:



1. regarding version support: ES v2 is no more
maintained by Elastic since 2018/02 so we plan to
remove it from the IO. In the past we already
retired versions (like spark 1.6 for instance).



My only concern here is that there might be users who use
the existing module who might not be able to easily
upgrade the Beam version if we remove it. But given that
V2 is 5 versions behind the latest release this might be OK.


It seems we have a consensus on this.
I think there should be another general discussion on the long
term support of our prefered tool IO modules.


=> yes, consensus, let's drop ESV2


We had (and still have) a similar problem with KafkaIO to
support different versions of Kafka, especially very old version
0.9. We raised this question on user@ and it appears that there
are users who for some reasons still use old Kafka versions. So,
before dropping a support of any ES versions, I’d suggest to ask
it user@ and see if any people will be affected by this.

Yes we can do a survey among users but the question is, should we
support an ES version that is no more supported by Elastic
themselves ?


+1 for asking in the user list. I guess this is more about whether 
users need this specific version that we hope to drop support for. 
Whether we need to support unsupported versions is a more generic 
question that should prob. be addressed in the dev list. (and I 
personally don't think we should unless there's a large enough user 
base for a given version).



2. regarding the user: the aim is to unlock some new
features (listed by Ludovic) and give the user more
flexibility on his request. For that, it requires to
use high level java ES client in place of the low
level REST client (that was used because it is the
only one compatible with all ES versions). We plan
to replace the API (json document in and out) by
more complete standard ES objects that contain de
request logic (insert/update, doc routing etc...)
and the data. There are already IOs like SpannerIO
that use similar objects in input PCollection rather
than pure POJOs.



Won't this be a breaking change for all users ? IMO using
POJOs in PCollections is safer since we have to worry
about changes to the underlying client library API.
Exception would be when underlying client library offers a
backwards compatibility guarantee that we can rely on for
the foreseeable future (for example, BQ TableRow).


Agreed but actually, there will be POJOs in order to abstract
Elasticsearch's version support. The following third point
explains this.


=> indeed it will be a breaking change, hence this email to get
a consensus on that. Also I think our wrappers of ES request
objects will offer a backward compatible as the underlying objects


I just want to remind that according to what we agreed some time
ago on dev@ (at least, for IOs), all breaking user API changes
have to be added along with deprecation of old API that could be
removed after 3 consecutive Beam releases. In this case, users
will have a time to move to new API smoothly.


We are more discussing the target architecture of the new module
here but the process of deprecation is important to recall, I
agree. When I say DTOs backward compatible above I mean between
per-version sub-modules inside the new module. Anyway, sure, for
some time, both modules (the old REST-based that supports v2-7
and the new that supports v5-7) will co

Elasticsearch use in Apache Beam

2020-02-13 Thread Etienne Chauchot

Hi everyone,

The Apache Beam community is currently working on refactoring the 
current ElasticsearchIO (see the thread [1] on the dev mailing list). To 
determine which Elasticsearch versions to support, we do a survey among 
users of Apache Beam. Can you please tell us more about your use of the 
ElasticsearchIO by participating to the survey below ?


https://docs.google.com/forms/d/e/1FAIpQLSdQWtvSBf66cvq9G3hJMbudX36WJPhHmiTPkcpcLxycmiSZwQ/viewform


[1] 
https://lists.apache.org/thread.html/reb68f37c435995a64ded19100e09dfc31c5cf6227feae16494226100%40%3Cdev.beam.apache.org%3E


Best

Etienne



Re: Feedback on how we use Apache Beam in my company

2019-10-09 Thread Etienne Chauchot

Very nice !

Thanks

ccing dev list

Etienne

On 09/10/2019 16:55, Pierre Vanacker wrote:


Hi Apache Beam community,

We’ve been working with Apache Beam in production for a few years now 
in my company (Dailymotion).


If you’re interested to know how we use Apache Beam in combination 
with Google Dataflow, we shared this experience in the following 
article : 
https://medium.com/dailymotion/realtime-data-processing-with-apache-beam-and-google-dataflow-at-dailymotion-7d1b994dc816


Thanks to the developers for your great work !

Regards,

Pierre



Re: 2019 Beam Events

2018-12-13 Thread Etienne Chauchot
Great work ! Thanks for sharing Gris !
Etienne
Le mercredi 05 décembre 2018 à 07:47 +, Matthias Baetens a écrit :
> Great stuff, Gris! Looking forward to what 2019 will bring!
> The Beam meetup in London will have a new get together early next year as 
> well :-) 
> https://www.meetup.com/London-Apache-Beam-Meetup/ 
> 
> 
> On Tue, 4 Dec 2018 at 23:50 Austin Bennett  
> wrote:
> > Already got that process kicked off with the NY and LA meet ups, now that 
> > SF is about to be inagurated goal will be
> > to get these moving as well.  
> > For anyone that is in (or goes to) those areas:
> > https://www.meetup.com/New-York-Apache-Beam/
> > https://www.meetup.com/Los-Angeles-Apache-Beam/
> > 
> > Please reach out to get involved!  
> > 
> > 
> > 
> > 
> > On Tue, Dec 4, 2018 at 3:13 PM Griselda Cuevas  wrote:
> > > +1 to Pablo's suggestion, if there's interest in "Founding a Meetup group 
> > > in a particular city, let's create the
> > > Meetup page and start getting sign ups. Joana will be reaching out with a 
> > > comprenhexive list of how to get started
> > > and we're hoping to compile a high level calendar of 
> > > launches/announcements to feed into your meetup. 
> > > G 
> > > 
> > > On Tue, 4 Dec 2018 at 12:04, Daniel Salerno  wrote:
> > > > =)What good news!Okay, I'll set up the group and try to get 
> > > > interested.Thank you
> > > > 
> > > > Em ter, 4 de dez de 2018 às 17:19, Pablo Estrada  
> > > > escreveu:
> > > > > FWIW, for some of these places that have interest (e.g. Brazil, 
> > > > > Israel), it's possible to create a group in
> > > > > meetup.com, and start gauging interest, and looking for 
> > > > > organizers.Once a group of people with interest
> > > > > exists, it's easier to get interest / sponsorship to bring speakers.
> > > > > So if you are willing to create the group in meetup, Daniel, we can 
> > > > > monitor it and try to plan something as it
> > > > > grows : )
> > > > > Best
> > > > > -P.
> > > > > 
> > > > > On Tue, Dec 4, 2018 at 10:55 AM Daniel Salerno 
> > > > >  wrote:
> > > > > > It's a shame that there are no events in Brazil ...
> > > > > > 
> > > > > > =(
> > > > > > 
> > > > > > Em ter, 4 de dez de 2018 às 13:12, OrielResearch Eila Arich-Landkof 
> > > > > >  escreveu:
> > > > > > > agree 
> > > > > > > 
> > > > > > > On Tue, Dec 4, 2018 at 5:41 AM Chaim Turkel  
> > > > > > > wrote:
> > > > > > > > Israel would be nice to have one
> > > > > > > > 
> > > > > > > > chaim
> > > > > > > > 
> > > > > > > > On Tue, Dec 4, 2018 at 12:33 AM Griselda Cuevas 
> > > > > > > >  wrote:
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > > Hi Beam Community,
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > > I started curating industry conferences, meetups and events 
> > > > > > > > > that are relevant for Beam, this initial
> > > > > > > > list I came up with. I'd love your help adding others that I 
> > > > > > > > might have overlooked. Once we're satisfied
> > > > > > > > with the list, let's re-share so we can coordinate proposal 
> > > > > > > > submissions, attendance and community
> > > > > > > > meetups there.
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > > Cheers,
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > > G
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > >
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > -- 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Loans are funded by
> > > > > > > > 
> > > > > > > > FinWise Bank, a Utah-chartered bank located in Sandy, 
> > > > > > > > 
> > > > > > > > Utah, member FDIC, Equal
> > > > > > > > 
> > > > > > > > Opportunity Lender. Merchant Cash Advances are 
> > > > > > > > 
> > > > > > > > made by Behalf. For more
> > > > > > > > 
> > > > > > > > information on ECOA, click here 
> > > > > > > > 
> > > > > > > > . For important information 
> > > > > > > > about 
> > > > > > > > 
> > > > > > > > opening a new
> > > > > > > > 
> > > > > > > > account, review Patriot Act procedures here 
> > > > > > > > 
> > > > > > > > .
> > > > > > > > 
> > > > > > > > Visit Legal 
> > > > > > > > 
> > > > > > > >  to
> > > > > > > > 
> > > > > > > > review our comprehensive program terms, 
> > > > > > > > 
> > > > > > > > conditions, and disclosures. 
> > > > > > > > 
> > > > > > > 
> > > > > > > 


Re: Beam Metrics using FlinkRunner

2018-12-11 Thread Etienne Chauchot
Phil,Also, I forgot, as I see no "master=local" or flink equivalent in your 
setup make sure you run flink in local mode
(in memory runner local to your machine) because you are trying to push metrics 
to http://localhost:3000. Make sure your
app does not get deployed to a remote flink cluster that would not have network 
access to your local machine.
Etienne
Le mardi 11 décembre 2018 à 15:07 +0100, Etienne Chauchot a écrit :
> Hi Phil,
> Your setup looks good to me and you are not using detached mode. 
> MetricsPusher in streaming mode on flink works so we
> need to figure out what is wrong. 
> - please check that you have port 3000 listening (just in case :) ) using 
> telnet or any network program- also you need
> to have beam-runners-extensions-java-metrics jar in the classpath- please 
> check the logs for a MetricsPushException
> exception. It would not stop the pipeline so you might have missed it but in 
> case of error, the exception will be
> wrapped into an MetricsPushException and be printed to the console.
> CCing dev list
> BestEtienne
> Le lundi 10 décembre 2018 à 08:45 -0600, Phil Franklin a écrit :
> > Hi, Etienne!  The actual command line is “mvn clean install”, because I set 
> > the arguments in the code:
> > 
> >   public void testFlinkRunner() throws Exception {
> > String[] args = new String[]{"--runner=FlinkRunner", 
> > "--streaming=true", "--inputTopic="+topic};
> > 
> > run(args);
> >   }
> > 
> > I’m using Beam 2.8.0, and Flink 1.5.4.
> > 
> > Because I wasn’t sure whether I was in detached mode, I called the 
> > toString() method on the Flink runner result to
> > see if I would get FlinkDetachedRunnerResult.  This is what was returned:
> > 
> > 
> > 
FlinkRunnerResult{accumulators={__metricscontainers=org.apache.beam.runners.core.metrics.MetricsContainerStepMap@b799384b
> > }, runtime=33126}
> > I set the options for MetricsHttpSink:
> > 
> > options.setMetricsHttpSinkUrl("http://localhost:3000;);
> > options.setMetricsSink(MetricsHttpSink.class);
> > This works when I test SparkRunner, so I believe I have it set up correctly 
> > for MetricsPusher to capture the metrics
> > from Flink as well.
> > -Phil
> > On 2018/12/07 14:46:06, Etienne Chauchot  wrote: 
> > > Hi Phil,> 
> > > MetricsPusher is tested on all the runners in both batch and streaming 
> > > mode. I just ran this test in Flink in
> > streaming> 
> > > mode and it works.> 
> > > What is the command line you are using and which version of Beam?> 
> > > 
> > > Please also remember that, as discussed,  metrics (other flink features ) 
> > > do not work if flink is used in detached
> > mode.> 
> > > 
> > > Etienne> 
> > > 
> > > 
> > > Le mardi 04 décembre 2018 à 12:49 -0600, Phil Franklin a écrit :> 
> > > > I’m having difficulty accessing Beam metrics when using FlinkRunner in 
> > > > streaming mode. I don’t get any metrics
> > from MetricsPusher, though the same setup delivered metrics from 
> > SparkRunner.  Probably for the same reason that
> > MetricsPusher doesn’t work, I also don’t get any output when I call an 
> > instance of MetricsHttpSink directly.  The
> > problem seems to be that Flink never returns from pipeline.run(), an issue 
> > that others have referred to as
> > FlinkRunner hanging.  > 
> > > > > 
> > > > Is there a solution for getting metrics in this case that I’m missing?> 
> > > > > 
> > > > Thanks!> 
> > > > -Phil> 
> > > 
> > 


Re: Beam Metrics using FlinkRunner

2018-12-11 Thread Etienne Chauchot
Hi Phil,
Your setup looks good to me and you are not using detached mode. MetricsPusher 
in streaming mode on flink works so we
need to figure out what is wrong. 
- please check that you have port 3000 listening (just in case :) ) using 
telnet or any network program- also you need
to have beam-runners-extensions-java-metrics jar in the classpath- please check 
the logs for a MetricsPushException
exception. It would not stop the pipeline so you might have missed it but in 
case of error, the exception will be
wrapped into an MetricsPushException and be printed to the console.
CCing dev list
BestEtienne
Le lundi 10 décembre 2018 à 08:45 -0600, Phil Franklin a écrit :
> Hi, Etienne!  The actual command line is “mvn clean install”, because I set 
> the arguments in the code:
> 
>   public void testFlinkRunner() throws Exception {
> String[] args = new String[]{"--runner=FlinkRunner", "--streaming=true", 
> "--inputTopic="+topic};
> 
> run(args);
>   }
> 
> I’m using Beam 2.8.0, and Flink 1.5.4.
> 
> Because I wasn’t sure whether I was in detached mode, I called the toString() 
> method on the Flink runner result to see
> if I would get FlinkDetachedRunnerResult.  This is what was returned:
> 
> 
> 
FlinkRunnerResult{accumulators={__metricscontainers=org.apache.beam.runners.core.metrics.MetricsContainerStepMap@b799384b
> }, runtime=33126}
> I set the options for MetricsHttpSink:
> 
> options.setMetricsHttpSinkUrl("http://localhost:3000;);
> options.setMetricsSink(MetricsHttpSink.class);
> This works when I test SparkRunner, so I believe I have it set up correctly 
> for MetricsPusher to capture the metrics
> from Flink as well.
> -Phil
> On 2018/12/07 14:46:06, Etienne Chauchot  wrote: 
> > Hi Phil,> 
> > MetricsPusher is tested on all the runners in both batch and streaming 
> > mode. I just ran this test in Flink in
> streaming> 
> > mode and it works.> 
> > What is the command line you are using and which version of Beam?> 
> > 
> > Please also remember that, as discussed,  metrics (other flink features ) 
> > do not work if flink is used in detached
> mode.> 
> > 
> > Etienne> 
> > 
> > 
> > Le mardi 04 décembre 2018 à 12:49 -0600, Phil Franklin a écrit :> 
> > > I’m having difficulty accessing Beam metrics when using FlinkRunner in 
> > > streaming mode. I don’t get any metrics
> from MetricsPusher, though the same setup delivered metrics from SparkRunner. 
>  Probably for the same reason that
> MetricsPusher doesn’t work, I also don’t get any output when I call an 
> instance of MetricsHttpSink directly.  The
> problem seems to be that Flink never returns from pipeline.run(), an issue 
> that others have referred to as FlinkRunner
> hanging.  > 
> > > > 
> > > Is there a solution for getting metrics in this case that I’m missing?> 
> > > > 
> > > Thanks!> 
> > > -Phil> 
> > 
> 


Re: Beam Metrics using FlinkRunner

2018-12-07 Thread Etienne Chauchot
Hi Phil,
MetricsPusher is tested on all the runners in both batch and streaming mode. I 
just ran this test in Flink in streaming
mode and it works.
What is the command line you are using and which version of Beam?

Please also remember that, as discussed,  metrics (other flink features ) do 
not work if flink is used in detached mode.

Etienne


Le mardi 04 décembre 2018 à 12:49 -0600, Phil Franklin a écrit :
> I’m having difficulty accessing Beam metrics when using FlinkRunner in 
> streaming mode. I don’t get any metrics from MetricsPusher, though the same 
> setup delivered metrics from SparkRunner.  Probably for the same reason that 
> MetricsPusher doesn’t work, I also don’t get any output when I call an 
> instance of MetricsHttpSink directly.  The problem seems to be that Flink 
> never returns from pipeline.run(), an issue that others have referred to as 
> FlinkRunner hanging.  
> 
> Is there a solution for getting metrics in this case that I’m missing?
> 
> Thanks!
> -Phil


Re: Beam Metrics questions

2018-12-03 Thread Etienne Chauchot
Hi Phil,
Thanks for the update I was checking the code and I was not understanding how 
the filtering could fail.

Etienne
Le vendredi 30 novembre 2018 à 10:53 -0600, Phil Franklin a écrit :
> Etienne, I’ve just discovered that the code I used for my tests overrides the 
> command-line arguments, and while I thought I was testing with the 
> SparkRunner and FlinkRunner, in fact every test used DirectRunner, which 
> explains why I was seeing the committed values.  So there’s no need for a 
> ticket concerning committed values from the FlinkRunner.  Sorry for the 
> confusion.
> 
> -Phil


Re: Beam Metrics questions

2018-11-30 Thread Etienne Chauchot
Hi Phil,
Thanks for using MetricsPusher and Beam in general ! 
- MetricsHttpSink works that way: it filters out committed metrics from the 
json output when committed metrics are not
supported.  I checked, Flink runner still does not support committed metrics. 
So there should be no committed metrics
values in the output json.There might be a bug. I'll open a ticket: thx for 
pointing out ! You tested on flink and spark
right? and both output committed metrics values right?
- there is no default mechanism to fallback committed metrics values on 
attempted ones
- Apache Flink does no make flink Accumulators available in detached mode, so 
indeed, metrics are not available in this
mode.
CCing dev list.
Etienne
Le lundi 26 novembre 2018 à 15:57 -0600, Phil Franklin a écrit :
> All of the discussion I’ve seen says that Flink and Spark only provided 
> attempted metric values, but when I use
> MetricsHttpSink and look at the JSON it has both attempted and committed 
> values (albeit, both the same for my simple
> testing).  Has the metrics processing been updated recently, and I’m just 
> missing the change updates?  Or are the
> committed values being defaulted to the attempted values? 
> 
> Also, I’ve seen it mentioned that Flink doesn’t report metrics when in 
> detached mode.  Is this still the case?
> 
> 
> Thanks for your help!


Re: [Call for items] November Beam Newsletter

2018-11-13 Thread Etienne Chauchot
Hi,I just added some things that were done.
Etienne
Le lundi 12 novembre 2018 à 12:22 +, Matthias Baetens a écrit :
> Looks great, thanks for the effort and for including the Summit blogpost, 
> Rose!
> On Thu, 8 Nov 2018 at 22:55 Rose Nguyen  wrote:
> > Hi Beamers:
> > 
> > 
> > Time to sync with the community on all the awesome stuff we've been doing!
> > 
> > 
> > Add the highlights from October to now (or planned events and talks) that 
> > you want to share by 11/14 11:59 p.m. PDT.
> > 
> > We will collect the notes via Google docs but send out the final version 
> > directly to the user mailing list. If you
> > do not know how to format something, it is OK to just put down the info and 
> > I will edit. I'll ship out the
> > newsletter on 11/15. 
> > 
> > [1] 
> > https://docs.google.com/document/d/1kKQ4a9RdptB6NwYlqmI9tTcdLAUzDnWi2dkvUi0J_Ww
> > -- 
> > Rose Thị Nguyễn
> -- 
>  


Re: Apache Beam Newsletter - August 2018

2018-08-22 Thread Etienne Chauchot
Hi Rose, 
I know the newsletter has already been sent, but may I add some of my ongoing 
subjects:
What's been done:- CI improvement: for each new commit on master Nexmark suite 
is run in both batch and streaming mode
in spark, flink, dataflow (thanks to Andrew) and dashboards graphs are produced 
to track functional and performance
regressions.
For talks, I guess only talks that already took place are included, not the 
ones scheduled for the ApacheCon in
September right ?
Etienne




Le vendredi 10 août 2018 à 12:37 -0700, Rose Nguyen a écrit :
> August 2018 | Newsletter
> What’s been doneApache Beam 2.6.0 Release
> The Apache Beam team is pleased to announce the release of 2.6.0 version! 
> This is the second release under the new
> build system, and the process has kept improving.You can download the release 
> here and read the release notes for more
> details.
> 
> Beam Summit London 2018 (by: Matthias Baetens, Gris Cuevas, Viktor Kotai)
> Approval from the Apache Software Foundation is underway. We are currently 
> finding a venue  and sponsors. We’ll send
> the call for participation soon to curate the agenda.If you’re interested in 
> participating in the organization of the
> event, reach out to the organizers.Dates TBD be we are considering the first 
> or last days of October.
> Support for Bounded SDF in all runners (by: Eugene Kirpichov)
> Beam introduced recently a new type of DoFn called SplittableDoFn (SDF) to 
> enable richer modularity in its IO
> connectors. Support for SDF in bounded (batch) connectors was added for all 
> runners. Apache Kudu IO (by: Tim
> Robertson)
> A new IO connector for the Apache Kudu data store was added recently.See 
> BEAM-2661 for more details on it.
> IO improvements (by: Ismaël Mejía)
> HBaseIO added a new transform based on SDF called readAll.See BEAM-4020 for 
> more details on it.
> 
> 
> What we’re working on...Interactive Runner for Beam (by: Harsh Vardhan, Sindy 
> Li, Chamikara Jayalath, Anand Iyer,
> Robert Bradshaw)
> Notebook-based interactive processing of Beam pipelines.This is now ready to 
> try out in Jupyter Notebook for
> BeamPython pipelines over DirectRunner!See the design doc for more details 
> and watch a demo here.Thoughts, comments
> and discussions welcome :)
> Python 3 Support (by, in alphabetical order: Ahmet Altay,  Robert Bradshaw, 
> Charles Chen, Matthias Feys, Holden Karau,
> Sergei Lebedev, Robbe Sneyders, Valentyn Tymofieiev)
> Major progress has been made on making Beam Python codebase 
> Python3-compatible through futurization.Read for more
> details in the proposal.
> 
> New IO connectors (by: John Rudolf Lewis, Jacob Marble)
> Amazon Simple Queue Service (SQS) is in review.Amazon Redshift is in 
> progress.Portable Runners (by: Ankur Goenka,
> Eugene Kirpichov, Ben Sidhom, Axel Magnuson, Thomas Weise, Ryan Williams , 
> Robert Bradshaw, Daniel Oliveira, Holden
> Karau)
> Good progress on Portable Flink Runner and many of the ValidatesRunner tests 
> are passing now.Portable Flink Runner can
> now execute batch WordCount in Java, Python and Go.Many enhancements and bug 
> fixes in Portable Reference Runner.See
> Jira https://issues.apache.org/jira/browse/BEAM-2889 for more details on  
> progress. Dependencies (by: Yifan Zou,
> Chamikara Jayalath)
> We added a dependencies guide for Beam and tooling to automatically create 
> JIRAs for significantly outdated
> dependencies. We are working on upgrading existing dependencies.See the Beam 
> dependencies guide for more details.
> 
> 
> 
> New MembersNew Contributors
> Rose Nguyen, Seattle, WA, USABeam docs contributor Working to improve docs 
> usability Connell O'Callaghan, Seattle, WA,
> USAInterested in growing the communityHelping with community triages and 
> managing issues
> 
> 
> 
> Talks & MeetupsStream Processing Meetup@LinkedIn  7/19/18
> Xinyu Liu gave a talk on building a Samza Runner for Beam“Beam meet up, 
> Samza!” and see it here. 
> Large Scale Landuse Classification of Satellite Images, Berlin 
> Buzzwords@Berlin 6/11/18
> Suneel Marthi and Jose Luis Contreras gave a talk on using streaming 
> pipelines built on Apache Flink for model
> training and inference. They leveraged convolutional Neural Networks (CNNs) 
> built with Apache MXNet to train Deep
> Learning models for land use classification. Read about it and watch it here.
> Big Data in Production Meetup@Cambridge, MA 6/28/18
> Robert Bradshaw and Eila Arich-Landkof gave a talk about Apache Beam and 
> machine learning. Event details here and
> watch their talks here.
> 
> ResourcesAwesome Beam (by: Pablo Estrada)
> Inspired by efforts in Awesome Flink and  Awesome Hadoop, I’ve created the 
> Awesome Beam repo to aggregate interesting
> Beam things.
> 
> 
> Until Next Time!
> This edition was curated by our community of contributors, committers and 
> PMCs. It contains work done in June and July
> of 2018 and ongoing efforts. We hope to provide visibility to what's going on 
> in the community, 

Re: Apache Beam Summit in Europe

2018-07-05 Thread Etienne Chauchot
Hi,
Just a comment, I'm not sure 28-29/09 is very practical because some of the 
Beam community will be at the apachecon in
Montreal ending Sept 27th.
Etienne
Le mercredi 04 juillet 2018 à 17:13 +0100, Matthias Baetens a écrit :
> Hi everyone!
> Thanks for filling out the survey. We are currently looking into the 
> practicalities (venue, dates, ...).
> To get a better grasp of the size of the audience, Alex has been so kind to 
> start a poll on Twitter: https://twitter.c
> om/alexvb/status/1014531906564775937. If you have a spare minute, it would be 
> great if you could fill this out
> (retweets are, of course, greatly appreciated as well).
> 
> For the dates, we are thinking of:
> - 28-29/09
> - 1-2/10 or 3-4/10
> - 15-16/10
> 
> Happy to hear your preferences and comments.
> Best,
> Matthias
> On Thu, 31 May 2018 at 07:15 Matthias Baetens  
> wrote:
> > Hi Beam Community, 
> > 
> > We are planning to have an Apache Beam Summit in Europe, pretty similar to 
> > what we hosted earlier in California on
> > March 15th this year. 
> > If you'd be interested in attending, helping with organization or speaking, 
> > please answer this Google form!
> > 
> > Cheers, 
> > Matthias & the events and meetups folks-- 
> >  
> -- 
>  

Re: Metrics: Non-cumulative values for Distribution

2018-06-19 Thread Etienne Chauchot
Hi Scott and Jozef,
Sorry for the late answer, I missed the email.
Well, MetricsPusher will aggregate the metrics just as PipelineResult.metrics() 
does but it will do so at given
configurable intervals and export the values. It means that if you configure 
the export to be every 5s, you will get the
aggregated (between workers) value of the distribution every 5 sec. It will not 
be reset. For ex, at t = 0 + 5s if the
max received until then is 10, then the value exported will be 10. Then, at t = 
0 + 10s, it the distribution was updated
with a 5 it will still report 10. Then at t = 0 + 15s, if the distribution was 
updated with a 11, then it will export
11. As metrics are global and not bound to windows like PCollection elements, 
you will always have the cumulative value
(essence of the distribution metric).  So I agree with Scott, better for your 
use case is to treat the metric as if it
was an element and compute it donwstream so that it could be bound to a window.
Etienne


Le samedi 02 juin 2018 à 08:01 +0300, Jozef Vilcek a écrit :
> Hi Scott,
> nothing special about the use-case. Just want to monitor upper and lower 
> bound for some data floating in operator. 
> The "report interval" is right now 30 seconds and it is independent of 
> business logic. It is the one mentionedd here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#reporter
> 
> and value set with respect to how granular and fast do I want to see changes 
> on what is going on in the pipeline
> compared to how much resources in time-series database I dedicate to it.
> 
> Thanks for looking into it   
> On Fri, Jun 1, 2018 at 7:49 PM, Scott Wegner  wrote:
> > Hi Jozef,
> > Can you elaborate a bit on your use-case; is the "report interval" a 
> > concept you depend on for your data processing
> > logic?
> > 
> > The Metrics API in general is designed to serve data to the executing 
> > runner or external service which can then
> > manage the aggregation and reporting through PipelineResult or monitoring 
> > UI. Etienne, do you know if MetricsPusher
> > [1] would help at all?
> > 
> > I suspect you'd be better off calculating the Min/Max values in a 
> > downstream Combine transform and set the
> > Windowing/Trigger strategy which captures the report interval you're 
> > looking for.
> > 
> > [1] https://s.apache.org/runner_independent_metrics_extraction 
> > 
> > On Fri, Jun 1, 2018 at 3:39 AM Jozef Vilcek  wrote:
> > > Hi,
> > > I am running a streaming job on flink and want to monitor MIN and MAX 
> > > ranges of a metric floating through
> > > operator. I did it via  org.apache.beam.sdk.metrics.Distribution
> > > 
> > > Problem is, that it seems to report only cumulative values. What I would 
> > > want instead is discrete report for MIN /
> > > MAX which were seen in each particular report interval.
> > > 
> > > Is there a way to get non-cumulative  data from beam distribution 
> > > metrics? What are my options?
> > > The obvious workaround is to track it "manually" and submit  2 gauge 
> > > metrics. I hope there is a better way ... Is
> > > there?

Re: Apache Beam June Newsletter

2018-06-14 Thread Etienne Chauchot
Thanks Gris, this is very cool !
besides we did not include schedule talks for the ApacheCon (end of September) 
because they 'll take place in a long
time, maybe they'll be announced in the next news letter?
Etienne
Le mercredi 13 juin 2018 à 16:41 -0700, Pablo Estrada a écrit :
> Thanks Gris! Lots of interesting things.Best
> -P.
> 
> On Wed, Jun 13, 2018 at 4:40 PM Griselda Cuevas  wrote:
> > Hi Beam Community! 
> > Here [1] is the June Edition of our Apache Beam Newsletter. This edition 
> > was curated by our community of
> > contributors, committers and PMCs. Generally, it contains the work done in 
> > the previous month (May in this case) and
> > what's planned for the future.
> > 
> > We hope to provide visibility to what's going on in the community, so if 
> > you have questions, feel free to ask in
> > this thread. 
> > 
> > Cheers, 
> > Gris
> > 
> > [1] 
> > https://docs.google.com/document/d/1BwRhOu-uDd3SLB_Om_Beke5RoGKos4hj7Ljh7zM2YIo/edit?ts=5b17fb92#
> > 
> > 
> > 
> > -- 
> > 
> > You received this message because you are subscribed to the Google Groups 
> > "datapls-team" group.
> > 
> > To unsubscribe from this group and stop receiving emails from it, send an 
> > email to datapls-team+unsubscribe@google.c
> > om.
> > 
> > To post to this group, send email to datapls-t...@google.com.
> > 
> > To view this discussion on the web visit 
> > https://groups.google.com/a/google.com/d/msgid/datapls-team/CAMtXPk6KnivR%3
> > Dea8ObNhTVoacDDAn35_Nrsa52hLzY21SjJPEw%40mail.gmail.com.
> > 
> > 
> > 
> > 

Re: Bundling in ParDos

2018-05-23 Thread Etienne Chauchot
Hi Abdul,

Going back to your use case, if the use case is to do batching of the elements 
on a unbounded source, then you can use
GroupIntoBatches transform that groups elements in batches (Iterables) of the 
size you specify.  You can then process
the batch downstream in your pipeline.
PS: to add an example to Eugene's list: I think in batch mode Spark runner 
evaluates the size of data in the
boundedSource then divides it by the number of the available workers. So it 
ends up having one bundle per worker of size
= total/number of workers.
Etienne
Le mardi 22 mai 2018 à 11:35 -0700, Eugene Kirpichov a écrit :
> Different runners decide it differently.
> E.g. for the Dataflow runner: in batch mode, bundles are usually quite large, 
> e.g. something like several-dozen-MB
> chunks of files, or pretty big key ranges of something like BigTable or 
> GroupByKey output. The bundle sizes are not
> known in advance (e.g. when the runner produces a bundle "read key range [a, 
> b)" the runner has no way of knowing how
> many keys there actually are between a and b), and they even change as the 
> job runs [1]. In streaming mode, bundles
> usually close after either a few thousand elements read from the source, or a 
> few seconds, whichever happens first -
> nothing too fancy going on.
> 
> Flink runner currently puts each element in its own bundle, but this is quite 
> inefficient and a known performance
> issue. Spark, I don't know. Direct runner I think has a mix between these 
> strategies.
> 
> Basically, if you want batching, you have to do it yourself, in a way that 
> does not violate runner bundle boundaries
> (don't batch across a FinishBundle). In practice this is trivial to implement 
> and never much of a problem.
> 
> [1] 
> https://qconlondon.com/system/files/presentation-slides/straggler-free_data_processing_in_cloud_dataflow.pdf
> On Tue, May 22, 2018 at 1:12 AM Abdul Qadeer  wrote:
> > Hi Eugene!
> > I had gone through that link before sending an email here. It does a decent 
> > job explaining when to use which method
> > and what kind of optimisations we are looking at, but didn’t really answer 
> > the question I had i.e. the controlling
> > granularity of elements of PCollection in a bundle. Kenneth made it clear 
> > that it is not in user control, but now I
> > am interested to know how does the runner decide it.
> > 
> > 
> > > On May 21, 2018, at 7:55 PM, Eugene Kirpichov  
> > > wrote:
> > > 
> > > Hi Abdul,Please see 
> > > https://stackoverflow.com/questions/45985753/what-is-the-difference-between-dofn-setup-and-dof
> > > n-startbundlele - let me know if it answers your question sufficiently.
> > > On Mon, May 21, 2018 at 7:04 PM Abdul Qadeer  
> > > wrote:
> > > > Hi!
> > > > I was trying to understand the behavior of StartBundle and FinishBundle 
> > > > w.r.t. DoFns.
> > > > I have an unbounded data source and I am trying to leverage bundling to 
> > > > achieve batching.
> > > > From the docs of ParDo:
> > > > 
> > > > "when a ParDo transform is executed, the elements of the input 
> > > > PCollection are first divided up into some number
> > > > of "bundles"
> > > > 
> > > > 
> > > > I would like to know if bundling is possible for unbounded data in the 
> > > > first place. If it is then how do I
> > > > control the bundle size i.e. number of elements of a given PCollection 
> > > > in that bundle?

Re: Monitoring and Management Tools for Beam an Friends

2018-03-26 Thread Etienne Chauchot
Hi Benjamin
Please know there is an ongoing PR for a runner agnostic metrics feature
https://github.com/apache/beam/pull/4548
Le jeudi 02 mars 2017 à 16:26 +, Stas Levin a écrit :
> Hi Benjamin,
> 
> This is somewhat of a hot topic lately, visibility FTW :)
> 
> My experience comes from doing Beam over Spark over Yarn, where we have been 
> using Graphite + Grafana (4.0+ gives
> alerting as well). 
> 
> In terms of metrics, the Spark runner supports both Spark native metrics 
> (default + Spark metric sources you
> configure, e.g. JVM), and Beam's, user-defined metrics, all of which are 
> reported to whatever Spark sink(s) you have
> configured, in our case Graphite.
> 
> There were a few caveats we had to address, most of which had to do with 
> associating an application's logical name and
> its application id assigned by Yarn. In other words, making all the metrics 
> reported by a given application id, go
> under its logical name required some effort. However, once it's been done it 
> allows for dashboards to be built around
> applications' logical names rather than around specific application ids which 
> is very useful.
> 
> There are more caveats to speak of in the context of metrics and Beam over 
> Spark (and streaming in particular), so let
> me know if you find the gory details interesting.
> 
> -Stas
> 
> On Thu, Mar 2, 2017 at 5:37 PM Amit Sela  wrote:
> > +Stas Levin 
> > 
> > On Thu, Mar 2, 2017 at 5:30 PM Jean-Baptiste Onofré  
> > wrote:
> > > Hi Benjamin,
> > > 
> > > It's a bit related to the Metric discussion on the dev@ mailing list.
> > > 
> > > Today, we leverage the monitoring and management provided by the
> > > execution engine of the runner.
> > > 
> > > For instance, with the Spark runner, we can use the "regular" history
> > > server, codehale reporter, etc. It's also possible to explicitly use a
> > > sink/fn to send data to a backend (Graphite, elasticsearch, ...).
> > > 
> > > We are now discussing about a agnostic way of "pushing" metric to a
> > > backend (something similar to what Karaf Decanter provides).
> > > 
> > > Regards
> > > JB
> > > 
> > > On 03/02/2017 04:13 PM, Stadin, Benjamin wrote:
> > > > Hi,
> > > >
> > > >
> > > >
> > > > I’m trying to collect a list of open source monitoring and management
> > > > tools for Beam and supported runners.
> > > >
> > > >
> > > >
> > > > What do you use in your daily routine, and what’s your experience?
> > > >
> > > >
> > > >
> > > > Regards
> > > >
> > > > Ben
> > > >
> > > >
> > > >
> > > >
> > > >
> > > 
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > > 

Re: [ANNOUNCE] Apache Beam 2.4.0 released

2018-03-22 Thread Etienne Chauchot
Great !
Le jeudi 22 mars 2018 à 08:24 +, Robert Bradshaw a écrit :
> We are pleased to announce the release of Apache Beam 2.4.0. Thanks goes to
> the many people who made this possible.
> 
> Apache Beam is an open source unified programming model to define and
> execute data processing pipelines, including ETL, batch and stream
> (continuous) processing. See https://beam.apache.org
> 
> You can download the release here:
> 
>  https://beam.apache.org/get-started/downloads/
> 
> As well as many bugfixes, some notable changes in this release are:
> - A new Python Direct runner, up to 15x faster than the old one.
> - Kinesis support for reading and writing in Java
> - Several refactoring to enable portability (Go/Python on Flink/Spark)
> 
> Full release notes can be found at
> 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12342682=12319527
> 
> Enjoy!

Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-16 Thread Etienne Chauchot

Chet,

FYI, here is the ticket and the design proposal: 
https://issues.apache.org/jira/browse/BEAM-3201. If you still want to 
code that improvement, give me your jira id and I will assign the ticket 
to you. Otherwise I can code it as well.


Best

Etienne


Le 16/11/2017 à 09:19, Etienne Chauchot a écrit :


Hi,

Thanks for the offer, I'd be happy to review your PR. Just wait a bit 
until I have opened a proper ticket for that. I still need to think 
more about the design. Among other things, I have to check what ES dev 
team did for other big data ES IO (es_hadoop) on that particular 
point. Besides, I think we also need to deal with the id at read time 
not only at write time. I'll give some details in the ticket.



Le 15/11/2017 à 20:08, Chet Aldrich a écrit :
Given that this seems like a change that should probably happen, and 
I’d like to help contribute if possible, a few questions and my 
current opinion:


So I’m leaning towards approach B here, which is:


b. (a bit less user friendly) PCollection with K as an id. But 
forces the user to do a Pardo before writing to ES to output KV 
pairs of <id, json>


I think that the reduction in user-friendliness may be outweighed by 
the fact that this obviates some of the issues surrounding a failure 
when finishing a bundle. Additionally, this /forces/ the user to 
provide a document id, which I think is probably better practice.


Yes as I wrote before, I think it is better to force the user to 
provide an id (at least for index updates, exactly-one semantics is a 
larger beam subject than this IO scope). Regarding design, plan b is 
not the better one IMHO because it changes the IO public API. I'm more 
in favor of plan a with the ability for the user to tell what field is 
his doc id.


This will also probably lead to fewer frustrations around “magic” 
code that just pulls something in if it happens to be there, and 
doesn’t if not. We’ll need to rely on the user catching this 
functionality in the docs or the code itself to take advantage of it.


IMHO it’d be generally better to enforce this at compile time because 
it does have an effect on whether the pipeline produces duplicates on 
failure. Additionally, we get the benefit of relatively intuitive 
behavior where if the user passes in the same Key value, it’ll update 
a record in ES, and if the key is different then it will create a new 
record.

Totally agree, id enforcement at compile time, no auto-generation


Curious to hear thoughts on this. If this seems reasonable I’ll go 
ahead and create a JIRA for tracking and start working on a PR for 
this. Also, if it’d be good to loop in the dev mailing list before 
starting let me know, I’m pretty new to this.

I'll create the ticket and we will loop on design in the comments.
Best
Etienne


Chet

On Nov 15, 2017, at 12:53 AM, Etienne Chauchot <echauc...@apache.org 
<mailto:echauc...@apache.org>> wrote:


Hi Chet,

What you say is totally true, docs written using ElasticSearchIO 
will always have an ES generated id. But it might change in the 
future, indeed it might be a good thing to allow the user to pass an 
id. Just in 5 seconds thinking, I see 3 possible designs for that.


a.(simplest) use a json special field for the id, if it is provided 
by the user in the input json then it is used, auto-generated id 
otherwise.


b. (a bit less user friendly) PCollection with K as an id. But 
forces the user to do a Pardo before writing to ES to output KV 
pairs of <id, json>


c. (a lot more complex) Allow the IO to serialize/deserialize java 
beans and have an String id field. Matching java types to ES types 
is quite tricky, so, for now we just relied on the user to serialize 
his beans into json and let ES match the types automatically.


Related to the problems you raise bellow:

1. Well, the bundle is the commit entity of beam. Consider the case 
of ESIO.batchSize being < to bundle size. While processing records, 
when the number of elements reaches batchSize, an ES bulk insert 
will be issued but no finishBundle. If there is a problem later on 
in the bundle processing before the finishBundle, the checkpoint 
will still be at the beginning of the bundle, so all the bundle will 
be retried leading to duplicate documents. Thanks for raising that! 
I'm CCing the dev list so that someone could correct me on the 
checkpointing mecanism if I'm missing something. Besides I'm 
thinking about forcing the user to provide an id in all cases to 
workaround this issue.


2. Correct.

Best,
Etienne

Le 15/11/2017 à 02:16, Chet Aldrich a écrit :

Hello all!

So I’ve been using the ElasticSearchIO sink for a project 
(unfortunately it’s Elasticsearch 5.x, and so I’ve been messing 
around with the latest RC) and I’m finding that it doesn’t allow 
for changing the document ID, but only lets you pass in a record, 
which means that the document ID is auto-generated. See this line 
for what specifically is happening:


https://github.

Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Etienne Chauchot
Yes, exactly. Actually, it raised from a discussion we had with Romain 
about ESIO.



Le 15/11/2017 à 10:08, Jean-Baptiste Onofré a écrit :
I think it's also related to the discussion Romain raised on the dev 
mailing list (gap between batch size, checkpointing & bundles).


Regards
JB

On 11/15/2017 09:53 AM, Etienne Chauchot wrote:

Hi Chet,

What you say is totally true, docs written using ElasticSearchIO will 
always have an ES generated id. But it might change in the future, 
indeed it might be a good thing to allow the user to pass an id. Just 
in 5 seconds thinking, I see 3 possible designs for that.


a.(simplest) use a json special field for the id, if it is provided 
by the user in the input json then it is used, auto-generated id 
otherwise.


b. (a bit less user friendly) PCollection with K as an id. But 
forces the user to do a Pardo before writing to ES to output KV pairs 
of <id, json>


c. (a lot more complex) Allow the IO to serialize/deserialize java 
beans and have an String id field. Matching java types to ES types is 
quite tricky, so, for now we just relied on the user to serialize his 
beans into json and let ES match the types automatically.


Related to the problems you raise bellow:

1. Well, the bundle is the commit entity of beam. Consider the case 
of ESIO.batchSize being < to bundle size. While processing records, 
when the number of elements reaches batchSize, an ES bulk insert will 
be issued but no finishBundle. If there is a problem later on in the 
bundle processing before the finishBundle, the checkpoint will still 
be at the beginning of the bundle, so all the bundle will be retried 
leading to duplicate documents. Thanks for raising that! I'm CCing 
the dev list so that someone could correct me on the checkpointing 
mecanism if I'm missing something. Besides I'm thinking about forcing 
the user to provide an id in all cases to workaround this issue.


2. Correct.

Best,
Etienne

Le 15/11/2017 à 02:16, Chet Aldrich a écrit :

Hello all!

So I’ve been using the ElasticSearchIO sink for a project 
(unfortunately it’s Elasticsearch 5.x, and so I’ve been messing 
around with the latest RC) and I’m finding that it doesn’t allow for 
changing the document ID, but only lets you pass in a record, which 
means that the document ID is auto-generated. See this line for what 
specifically is happening:


https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838 



Essentially the data part of the document is being placed but it 
doesn’t allow for other properties, such as the document ID, to be set.


This leads to two problems:

1. Beam doesn’t necessarily guarantee exactly-once execution for a 
given item in a PCollection, as I understand it. This means that you 
may get more than one record in Elastic for a given item in a 
PCollection that you pass in.


2. You can’t do partial updates to an index. If you run a batch job 
once, and then run the batch job again on the same index without 
clearing it, you just double everything in there.


Is there any good way around this?

I’d be happy to try writing up a PR for this in theory, but not sure 
how to best approach it. Also would like to figure out a way to get 
around this in the meantime, if anyone has any ideas.


Best,

Chet

P.S. CCed echauc...@gmail.com <mailto:echauc...@gmail.com> because 
it seems like he’s been doing work related to the elastic sink.











Re: ElasticSearch with RestHighLevelClient

2017-10-25 Thread Etienne Chauchot

Hi,

We have used the low level rest client for the ElasticsearchIO despite 
the fact that there is now the high level client for maintenance 
reasons. Indeed, the IO production code is common to all Elasticsearch 
versions because the low level Rest client is compatible with all ES 
versions. This is not the case with the high level client which major 
version has to be the same as the targeted ES major version which will 
lead to having different IO versions for different ES versions.


That is why IMHO it is not a good idea to use the high level Rest client 
in the ElasticsearchIO.


Regards,

Etienne Chauchot


Le 23/10/2017 à 23:21, Ryan Bobko a écrit :

Thanks Tim,
I believe I'm doing what Jean-Baptiste recommends, so I guess I'll
have a look at the snapshot and see what's different. I don't mind
waiting a bit if it means I don't have to duplicate working code.

ry

On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson
<timrobertson...@gmail.com> wrote:

Hi Ryan,

I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster.  I am told
2.2.0 is expected within a couple weeks.
My work is only a proof of concept for now, but I put in 300M fairly small
docs at around 100,000/sec on a 3 node cluster without any issue [1].

Hope this helps,
Tim


[1]
https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java


On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Hi Ryan,

the last version of ElasticsearchIO (that will be included in Beam 2.2.0)
supports Elasticsearch 5.x.

The client should be created in the @Setup (or @StartBundle) and release
cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement
to actually store the elements in the PCollection.

Regards
JB


On 10/23/2017 08:53 PM, Ryan Bobko wrote:

Hi JB,
Thanks for your input. I'm trying to update ElasticsearchIO, and
hopefully learn a bit about Beam in the process. The documentation
says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
prefer not to have two ES libs in my classpath if I can avoid it. I'm
just getting started, so my pipeline is quite simple:

pipeline.apply( "Raw Reader", reader ) // read raw files
  .apply( "Document Generator", ParDo.of( extractor ) ) //
create my document objects for ES insertion
  .apply( "Elastic Writer", new ElasticWriter( ... ); //
upload to ES


public final class ElasticWriter extends
PTransform<PCollection, PDone> {

private static final Logger log = LoggerFactory.getLogger(
ElasticWriter.class );
private final String elasticurl;

public ElasticWriter( String url ) {
  elasticurl = url;
}

@Override
public PDone expand( PCollection input ) {
  input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
  return PDone.in( input.getPipeline() );
}

public static class WriteFn extends DoFn<Document, Void> implements
Serializable {

  private transient RestHighLevelClient client;
  private final String elasticurl;

  public WriteFn( String elasticurl ) {
this.elasticurl = elasticurl;
  }

  @Setup
  public void setup() {
log.debug( " into WriteFn::setup" );
HttpHost elastic = HttpHost.create( elasticurl );
RestClientBuilder bldr = RestClient.builder( elastic );

// if this is uncommented, the program never exits
//client = new RestHighLevelClient( bldr.build() );
  }

  @Teardown
  public void teardown() {
log.debug( " into WriteFn::teardown" );
// there's nothing to tear down
  }

  @ProcessElement
  public void pe( ProcessContext c ) {
Document doc = DocumentImpl.from( c.element() );
log.debug( "writing {} to elastic", doc.getMetadata().first(
Metadata.NAME ) );

// this is where I want to write to ES, but for now, just write
a text file

ObjectMapper mpr = new ObjectMapper();

try ( Writer fos = new BufferedWriter( new FileWriter( new File(
"/tmp/writers",
doc.getMetadata().first( Metadata.NAME ).asString() ) ) )
) {
  mpr.writeValue( fos, doc );
}
catch ( IOException ioe ) {
  log.error( ioe.getLocalizedMessage(), ioe );
}
  }
}
}


On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Hi Ryan,

Why don't you use the ElasticsearchIO for that ?

Anyway, can you share your pipeline where you have the ParDo calling
your
DoFn ?

Thanks,
Regards
JB


On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote:


Hi List,
I'm trying to write an updated ElasticSearch client using the
newly-published RestHighLevelClient class (with ES 5.6.0). I'm only
interested in writes at this time, so I'm using the
ElasticsearchIO.write()
function as a model. I h