Re: Large public Beam projects?

2020-04-21 Thread Tim Robertson
My apologies, I missed the link:

[1] https://github.com/gbif/pipelines

On Tue, Apr 21, 2020 at 5:58 PM Tim Robertson 
wrote:

> Hi Jordan
>
> I don't know if we qualify as a large Beam project but at GBIF.org we
> bring together datasets from 1600+ institutions documenting 1,4B
> observations of species (museum data, citizen science, environmental
> reports etc).
> As far as Beam goes though, we aren't using the most advanced
> features. It's batch processing of data into Avro files stored on HDFS then
> into HBase / Elasticsearch.
>
> All our data and code [1] are open and I'm happy to discuss any aspect of
> it if it is helpful to you.
>
> Best wishes,
> Tim
>
> On Tue, Apr 21, 2020 at 3:48 PM Jeff Klukas  wrote:
>
>> Mozilla hosts the code for our data ingestion system publicly on GitHub.
>> A good chunk of that architecture consists of Beam pipelines running on
>> Dataflow.
>>
>> See:
>>
>> https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam
>>
>> and rendered usage documentation at:
>>
>> https://mozilla.github.io/gcp-ingestion/ingestion-beam/
>>
>> On Mon, Apr 20, 2020 at 7:11 PM Jordan Thomas-Green 
>> wrote:
>>
>>> Does anyone have any public repos/examples of larger Beam
>>> projects/implementations that they've seen?
>>>
>>


Re: Large public Beam projects?

2020-04-21 Thread Tim Robertson
Hi Jordan

I don't know if we qualify as a large Beam project but at GBIF.org we bring
together datasets from 1600+ institutions documenting 1,4B observations of
species (museum data, citizen science, environmental reports etc).
As far as Beam goes though, we aren't using the most advanced
features. It's batch processing of data into Avro files stored on HDFS then
into HBase / Elasticsearch.

All our data and code [1] are open and I'm happy to discuss any aspect of
it if it is helpful to you.

Best wishes,
Tim

On Tue, Apr 21, 2020 at 3:48 PM Jeff Klukas  wrote:

> Mozilla hosts the code for our data ingestion system publicly on GitHub. A
> good chunk of that architecture consists of Beam pipelines running on
> Dataflow.
>
> See:
>
> https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam
>
> and rendered usage documentation at:
>
> https://mozilla.github.io/gcp-ingestion/ingestion-beam/
>
> On Mon, Apr 20, 2020 at 7:11 PM Jordan Thomas-Green 
> wrote:
>
>> Does anyone have any public repos/examples of larger Beam
>> projects/implementations that they've seen?
>>
>


Re: Beam discarding massive amount of events due to Window object or inner processing

2019-10-16 Thread Tim Sell
I think everyone who followed this thread learned something! I know I did.

Thanks for asking these questions. The summary and code snippets were just
the right length to be accessible and focussed.

On Wed, 16 Oct 2019, 06:04 Eddy G,  wrote:

> Thank you so so much guys for the amazing feedback you're giving me!
>
> I'm applying all of it and deep diving into more detail and see where I
> could also go from there so I can still get the pipeline performance way
> better.
>
> Again, really appreciated guys, you are amazing.
>


Re: Beam discarding massive amount of events due to Window object or inner processing

2019-10-14 Thread Tim Sell
You're getting 1 shard per pane, and you get a pane every time it's
triggered on an early firing. And then another one in the final on-time
pane. To have 1 file with 1 shard for every 15 minute window you need to
only fire on window close. Ie AfterWatermark.pastendofwindow, without early
firing.

On Mon, 14 Oct 2019, 14:35 Eddy G,  wrote:

> Thanks a lot everyone for your so valuable feedback!
>
> Just updated my code, made some minor refactoring and seems to be working
> like a charm. Still some data being dropped due to lateness (but I'm
> talking about 100 elements per 2 million, so no "big deal" there, I will
> take a look into extending lateness and overall performance bits that I'm
> missing out).
>
> A thing that worries me a lot is that the wall time has been exponentially
> increasing up to 1 day and 3 hours in the stage that is in charge of
> writing all that captured data into parquet files, supposedly due to
> .parquet file writing code.
>
> I suppose that this is also the reason why I still get tons of small
> parquet files within a same bucket, as I should only have, in a perfect
> scenario, 4 files (1 each 15 minutes due to the Window object length), when
> I'm currently having +60!
>
> .apply("Write .parquet File(s)",
> FileIO
> .writeDynamic()
> .by((SerializableFunction)
> event -> {
> // specify partitioning here
> })
> .via(ParquetIO.sink(AVRO_SCHEMA))
> .to(options.getOutputDirectory())
> .withNaming(type -> ParquetFileNaming.getNaming(...))
> .withDestinationCoder(StringUtf8Coder.of())
> .withNumShards(1) // should this be 0? Could this
> imply increasing of costs if set to 0?
>


Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-18 Thread Tim Sell
+1, I'd love to see this as a recording. Will you stick it up on youtube
afterwards?

On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog 
wrote:

> Thanks, Pablo! Looking forward to it! Hopefully, it will also be recorded
> as well.
>
> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada  wrote:
>
>> Yes! So I will be working on a small feature request for Java's
>> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607
>>
>> Maybe I'll do something for Python next month. : )
>> Best
>> -P.
>>
>> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar 
>> wrote:
>>
>>> +1, I really appreciate this initiative. It would be really helpful
>>> newbies like me.
>>>
>>> Is it possible to list out what are the things that you are planning to
>>> cover?
>>>
>>>
>>>
>>>
>>> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang  wrote:
>>>
 Thanks for organizing this Pablo, it'll be very helpful!

 On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada 
 wrote:

> Hello all,
> I'll be having a session where I live-fix a Beam bug for 1 hour next
> week. Everyone is invited.
>
> It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I will
> finish a full change in that time frame, but we'll see.
>
> I have not yet decided if I will do this via hangouts, or via a
> youtube livestream. In any case, I will share the link here in the next 
> few
> days.
>
> I will most likely work on the Java SDK (I have a little feature
> request in mind).
>
> Thanks!
> -P.
>



Re: gRPC method to get a pipeline definition?

2019-06-26 Thread Tim Robertson
Another +1 to support your research into this Chad. Thank you.

Trying to understand where a beam process is in the Spark DAG is... not
easy. A UI that helped would be a great addition.



On Wed, Jun 26, 2019 at 3:30 PM Ismaël Mejía  wrote:

> +1 don't hesitate to create a JIRA + PR. You may be interested in [1].
> This is a simple util class that takes a proto pipeline object and
> converts it into its graph representation in .dot format. You can
> easily reuse the code or the idea as a first approach to show what the
> pipeline is about.
>
> [1]
> https://github.com/apache/beam/blob/2df702a1448fa6cbd22cd225bf16e9ffc4c82595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PortablePipelineDotRenderer.java#L29
>
> On Wed, Jun 26, 2019 at 10:27 AM Robert Bradshaw 
> wrote:
> >
> > Yes, offering a way to get a pipeline from the job service directly
> > would be a completely reasonable thing to do (and likely not hard at
> > all). We welcome pull requests.
> >
> > Alternative UIs built on top of this abstraction would be an
> > interesting project to explore.
> >
> > On Wed, Jun 26, 2019 at 8:44 AM Chad Dombrova  wrote:
> > >
> > > Hi all,
> > > I've been poking around the beam source code trying to determine
> whether it's possible to get the definition of a pipeline via beam's
> gPRC-based services.   It looks like the message types are there for
> describing a Pipeline but as far as I can tell, they're only used by
> JobService.Prepare() for submitting a new job.
> > >
> > > If I were to create a PR to add support for a JobService.GetPipeline()
> method, would that be interesting to others?  Is it technically feasible?
> i.e. is the pipeline definition readily available to the job service after
> the job has been prepared and sent to the runner?
> > >
> > > Bigger picture, what I'm thinking about is writing a UI that's
> designed to view and monitor Beam pipelines via the portability
> abstraction, rather than using the (rather clunky) UIs that come with
> runners like Flink and Dataflow.  My thinking is that using beam's
> abstractions would future proof the UI by allowing it to work with any
> portable runner.  Right now it's just an idea, so I'd love to know what
> others think of this.
> > >
> > > thanks!
> > > -chad
> > >
>


Re: [ANNOUNCEMENT] Common Pipeline Patterns - new section in the documentation + contributions welcome

2019-06-07 Thread Tim Robertson
This is great. Thanks Pablo and all

I've seen several folk struggle with writing avro to dynamic locations
which I think might be a good addition. If you agree I'll offer a PR unless
someone gets there first - I have an example here:

https://github.com/gbif/pipelines/blob/master/pipelines/export-gbif-hbase/src/main/java/org/gbif/pipelines/hbase/beam/ExportHBase.java#L81


On Fri, Jun 7, 2019 at 10:52 PM Pablo Estrada  wrote:

> Hello everyone,
> A group of community members has been working on gathering and providing
> common pipeline patterns for pipelines in Beam. These are examples on how
> to perform certain operations, and useful ways of using Beam in your
> pipelines. Some of them relate to processing of files, use of side inputs,
> sate/timers, etc. Check them out[1].
>
> These initial patterns have been chosen based on evidence gathered from
> StackOverflow, and from talking to users of Beam.
>
> It would be great if this section could grow, and be useful to many Beam
> users. For that reason, we invite anyone to share patterns, and pipeline
> examples that they have used in the past. If you are interested in
> contributing, please submit a pull request, or get in touch with Cyrus
> Maden, Reza Rokni, Melissa Pashniak or myself.
>
> Thanks!
> Best
> -P.
>
> [1] https://beam.apache.org/documentation/patterns/overview/
>


Re: PubSubIO watermark not advancing for low volumes

2019-05-15 Thread Tim Sell
Thanks!

I made a jira
https://issues.apache.org/jira/browse/BEAM-7322

And dumped my sample code here:
https://github.com/tims/beam/tree/master/pubsub-watermark

*From: *Alexey Romanenko 
*Date: *Wed, May 15, 2019 at 12:18 AM
*To: * 

Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178
>
> On 13 May 2019, at 20:52, Kenneth Knowles  wrote:
>
> You should definitely not feel foolish. That was a great report. I expect
> many users face the same situation. If they are lurking on this list, then
> you will have helped them already.
>
> Reza - I expect you should weigh in on the Jira, too, since the "one
> message test" use case seems like it wouldn't work at all with those
> MovingFunction params. But I may not understand all the subtleties of the
> connector.
>
> Kenn
>
> *From: *Tim Sell 
> *Date: *Mon, May 13, 2019 at 8:06 AM
> *To: * 
>
> Thanks for the feedback, I did some more investigating after you said 1
>> second frequency should be enough to sample on.. And it is I feel foolish.
>> I think I just wasn't waiting long enough as it takes minutes to close
>> the windows. We waited much longer when we were just messages manually and
>> never had a window close.
>>
>> I'm generating some stats of lag times to window closing for different
>> frequencies, with code so people can reproduce it, then I'll add this to a
>> jira ticket.
>>
>> *From: *Kenneth Knowles 
>> *Date: *Mon, May 13, 2019 at 10:48 AM
>> *To: * , dev
>>
>> Nice analysis & details!
>>>
>>> Thanks to your info, I think it is the configuration of MovingFunction
>>> [1] that is the likely culprit, but I don't totally understand why. It is
>>> configured like so:
>>>
>>>  - store 60 seconds of data
>>>  - update data every 5 seconds
>>>  - require at least 10 messages to be 'significant'
>>>  - require messages from at least 2 distinct 5 second update periods to
>>> 'significant'
>>>
>>> I would expect a rate of 1 message per second to satisfy this. I may
>>> have read something wrong.
>>>
>>> Have you filed an issue in Jira [2]?
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>>> [2] https://issues.apache.org/jira/projects/BEAM/issues
>>>
>>> *From: *Tim Sell 
>>> *Date: *Fri, May 10, 2019 at 4:09 AM
>>> *To: * 
>>>
>>> Hello,
>>>>
>>>> I have identified an issue where the watermark does not advance when
>>>> using the beam PubSubIO when volumes are very low.
>>>>
>>>> The behaviour is easily replicated if you apply a fixed window
>>>> triggering after the watermark passes the end of the window.
>>>>
>>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>>> .apply(ParDo.of(new ParseScoreEventFn()))
>>>> 
>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
>>>> .triggering(AfterWatermark.pastEndOfWindow())
>>>> .withAllowedLateness(Duration.standardSeconds(60))
>>>> .discardingFiredPanes())
>>>> .apply(MapElements.into(kvs(strings(), integers()))
>>>> .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
>>>> scoreEvent.getScore(
>>>> .apply(Count.perKey())
>>>> .apply(ParDo.of(Log.of("counted per key")));
>>>>
>>>> With this triggering, using both the flink local runner the direct
>>>> runner, *no panes will ever be emitted* if the volume of messages in
>>>> pubsub is very low. eg 1 per second.
>>>>
>>>> If I change the triggering to have early firings I get exactly the
>>>> emitted panes that you would expect.
>>>>
>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
>>>> .triggering(AfterWatermark.pastEndOfWindow()
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>> .alignedTo(Duration.standardSeconds(60
>>>> .withAllowedLateness(Duration.standardSeconds(60))
>>>> .discardingFiredPanes())
>>>>
>>>> I can use any var

Re: ElasticsearchIO Write Batching Problems

2018-12-07 Thread Tim
Great. Thanks for sharing Evan.

Tim

> On 7 Dec 2018, at 20:06, Evan Galpin  wrote:
> 
> I've actually found that this was just a matter of pipeline processing speed. 
> I removed many layers of transforms such that entities flowed through the 
> pipeline faster, and saw the batch sizes increase. I think I may make a 
> separate pipeline to take full advantage of batch indexing.
> 
> Thanks!
> 
>> On 2018/12/07 14:36:44, Evan Galpin  wrote: 
>> I forgot to reiterate that the PCollection on which EsIO operates is of type 
>> String, where each element is a valid JSON document serialized _without_ 
>> pretty printing (i.e. without line breaks). If the PCollection should be of 
>> a different type, please let me know. From the EsIO source code, I believe 
>> it is correct to have a PCollection of String
>> 
>>> On 2018/12/07 14:33:17, Evan Galpin  wrote: 
>>> Thanks for confirming that this is unexpected behaviour Tim; certainly the 
>>> EsIO code looks to handle bundling. For the record, I've also confirmed via 
>>> debugger that `flushBatch()` is not being triggered by large document size.
>>> 
>>> I'm sourcing records from Google's BigQuery.  I have 2 queries which each 
>>> create a PCollection. I use a JacksonFactory to convert BigQuery results to 
>>> a valid JSON string (confirmed valid via debugger + linter). I have a few 
>>> Transforms to group the records from the 2 queries together, and then 
>>> convert again to JSON string via Jackson. I do know that the system creates 
>>> valid requests to the bulk API, it's just that it's only 1 document per 
>>> request.
>>> 
>>> Thanks for starting the process with this. If there are other specific 
>>> details that I can provide to be helpful, please let me know. Here are the 
>>> versions of modules I'm using now:
>>> 
>>> Beam SDK (beam-sdks-java-core): 2.8.0
>>> EsIO (beam-sdks-java-io-elasticsearch): 2.8.0
>>> BigQuery IO (beam-sdks-java-io-google-cloud-platform): 2.8.0
>>> DirectRunner (beam-runners-direct-java): 2.8.0
>>> DataflowRunner (beam-runners-google-cloud-dataflow-java): 2.8.0
>>> 
>>> 
>>>> On 2018/12/07 06:36:58, Tim  wrote: 
>>>> Hi Evan
>>>> 
>>>> That is definitely not the expected behaviour and I believe is covered in 
>>>> tests which use DirectRunner. Are you able to share your pipeline code, or 
>>>> describe how you source your records please? It could be that something 
>>>> else is causing EsIO to see bundles sized at only one record.
>>>> 
>>>> I’ll verify ES IO behaviour when I get to a computer too.
>>>> 
>>>> Tim (on phone)
>>>> 
>>>>> On 6 Dec 2018, at 22:00, e...@calabs.ca  wrote:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> I’m having a bit of trouble with ElasticsearchIO Write transform. I’m 
>>>>> able to successfully index documents into my elasticsearch cluster, but 
>>>>> batching does not seem to work. There ends up being a 1:1 ratio between 
>>>>> HTTP requests sent to `/my-index/_doc/_bulk` and the number of documents 
>>>>> in my PCollection to which I apply the ElasticsearchIO PTransform. I’ve 
>>>>> noticed this specifically under the DirectRunner by utilizing a debugger.
>>>>> 
>>>>> Am I missing something? Is this possibly a difference between execution 
>>>>> environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure 
>>>>> my program is taking advantage of batching/bulk indexing?
>>>>> 
>>>>> Thanks,
>>>>> Evan
>>>> 
>>> 
>> 


Re: Moving to spark 2.4

2018-12-07 Thread Tim Robertson
To clarify Ismaël's comment

Cloudera repo indicates Cloudera 6.1 will have spark 2.4 but CDH is
currently still on 6.0.

https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/spark/spark-core_2.11/2.4.0-cdh6.1.0/

With the HWX / Cloudera merger the release cycle is not announced but 6.1
will likely not be there until early 2019. Also note that many folk will
still be on CDH 5 as CDH 6 is relatively new.







On Fri, Dec 7, 2018 at 5:06 PM Ismaël Mejía  wrote:

> It seems that Cloudera has it now, not sure if worth to wait for the
> Hortonworks maybe worth waiting for EMR.
>
> https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/spark/spark-core_2.11/
>
> A pro move to Spark 2.4.0 argument is for the future oriented (non
> hadoop friends), because the support for kubernetes has improved a lot
> in this release.
>
> On Fri, Dec 7, 2018 at 4:56 PM David Morávek 
> wrote:
> >
> > +1 for waiting for HDP and CDH adoption
> >
> > Sent from my iPhone
> >
> > On 7 Dec 2018, at 16:38, Alexey Romanenko 
> wrote:
> >
> > I agree with Ismael and I’d wait until the new Spark version will be
> supported by major BigData distributors.
> >
> > On 7 Dec 2018, at 14:57, Vishwas Bm  wrote:
> >
> > Hi Ismael,
> >
> > We have upgraded the spark to 2.4.
> > In our setup we had run few basic tests and found it to be pretty stable.
> >
> >
> > Thanks & Regards,
> > Vishwas
> >
> >
> > On Fri, Dec 7, 2018 at 2:53 PM Ismaël Mejía  wrote:
> >>
> >> Hello Vishwas,
> >>
> >> The spark dependency in the spark runner is provided so you can
> >> already pass the dependencies of spark 2.4 and it should work out of
> >> the box.
> >>
> >> JB did a PR to upgrade the version of Spark in the runner, but maybe
> >> it is worth to wait a bit before merging it, at least until some of
> >> the Big Data distributions has spark 2.4.x support available, so far
> >> nobody has upgraded it (well apart of databricks).
> >>
> >> What do others think, should we move ahead or are you aware of any
> >> issue introduced by version 2.4.0? (Notice that the PR just updates
> >> the version so code compatibility should be ok).
> >>
> >> Ismaël
> >>
> >> On Thu, Dec 6, 2018 at 12:14 PM Jean-Baptiste Onofré 
> wrote:
> >> >
> >> > Hi Vishwas
> >> >
> >> > Yes, I already started the update.
> >> >
> >> > Regards
> >> > JB
> >> >
> >> > On 06/12/2018 07:39, Vishwas Bm wrote:
> >> > > Hi,
> >> > >
> >> > > Currently I see that the spark version dependency used in Beam is
> >> > > //"2.3.2".
> >> > > As spark 2.4 is released now, is there a plan to upgrade Beam spark
> >> > > dependency ?
> >> > >
> >> > >
> >> > > *Thanks & Regards,*
> >> > > *Vishwas
> >> > > *
> >> > > *Mob : 9164886653*
> >> >
> >> > --
> >> > Jean-Baptiste Onofré
> >> > jbono...@apache.org
> >> > http://blog.nanthrax.net
> >> > Talend - http://www.talend.com
> >
> >
>


Re: ElasticsearchIO Write Batching Problems

2018-12-06 Thread Tim
Hi Evan

That is definitely not the expected behaviour and I believe is covered in tests 
which use DirectRunner. Are you able to share your pipeline code, or describe 
how you source your records please? It could be that something else is causing 
EsIO to see bundles sized at only one record.

I’ll verify ES IO behaviour when I get to a computer too.

Tim (on phone)

> On 6 Dec 2018, at 22:00, e...@calabs.ca  wrote:
> 
> Hi all,
> 
> I’m having a bit of trouble with ElasticsearchIO Write transform. I’m able to 
> successfully index documents into my elasticsearch cluster, but batching does 
> not seem to work. There ends up being a 1:1 ratio between HTTP requests sent 
> to `/my-index/_doc/_bulk` and the number of documents in my PCollection to 
> which I apply the ElasticsearchIO PTransform. I’ve noticed this specifically 
> under the DirectRunner by utilizing a debugger.
> 
> Am I missing something? Is this possibly a difference between execution 
> environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure my 
> program is taking advantage of batching/bulk indexing?
> 
> Thanks,
> Evan


Re: bean elasticsearch connector for dataflow

2018-12-04 Thread Tim
Beam 2.8.0 brought in support for ES 6.3.x

I’m not sure if that works against a 6.5.x server but I could imagine it does.

Tim,
Sent from my iPhone

> On 4 Dec 2018, at 18:28, Adeel Ahmad  wrote:
> 
> Hello,
> 
> I am trying to use gcp dataflow for indexing data from pubsub into 
> elasticsearch.
> Does dataflow (which uses beam) now support elasticsearch 6.5.x or does it 
> still only support 5.6.x?
> 
> -- 
> Thanks,
> 
> Adeel
> 
> 
> 


Re: No filesystem found for scheme hdfs - with the FlinkRunner

2018-10-26 Thread Tim
Thanks for sharing that

Tim,
Sent from my iPhone

> On 26 Oct 2018, at 17:50, Juan Carlos Garcia  wrote:
> 
> Just for everyone to know we figure it out, it was an environment problem.
> 
> In our case we have our cluster in a network that is not accessible directly, 
> so to deploy we need to uses Jenkins  with some slaves that have access to 
> that network.
> 
> During deployment in the main method of the class we execute 
> FileSystems.setDefaultPipelineOptions(_options); which trigger the 
> HadoopFileSystemOptionsRegistrar via the ServiceLoader mechanism and this 
> access the environment variable HADOOP_CONF_DIR in order to correctly 
> register the Filesystem.
> 
> SO, its very important that the machine you are using for deployment have 
> that Environment variable set as well (not only the worker where the pipeline 
> will run).
> 
> In our case the variable was set on the .bashrc of the user used for 
> deployment, but here is the catch.
> 
> We were using "sudo -u DEPLOYMENT_USER -s /var/lib/flink/bin/flink run -d 
> .", but the flag -s do not execute the user .bashrc (.bash_profile), 
> hence we have failures at runtime. The fix was just replacing -s flag with -i 
> to make sure the environment variable is present when the command to run 
> works.
> 
> Thanks
> 
> 
>> On Fri, Oct 26, 2018 at 1:52 PM Juan Carlos Garcia  
>> wrote:
>> Hi Tim,
>> 
>> I am using FileIO directly with the AvroIO.sink(...), however having 
>> experienced BEAM-2277 with the SparkRunner few months ago, i got the feeling 
>> this is something different (maybe some dependency mismatch/missing).
>> 
>> Thanks
>> 
>>> On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson  
>>> wrote:
>>> Hi Juan
>>> 
>>> This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277 
>>> which we believed fixed in 2.7.0.
>>> What IO are you using to write your files and can you paste a snippet of 
>>> your code please?
>>> 
>>> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a 
>>> workaround too):
>>> 
>>>  transform.apply("Write",
>>> AvroIO.writeGenericRecords(schema)
>>> .to(FileSystems.matchNewResource(options.getTarget(),true))
>>> // BEAM-2277 workaround
>>> 
>>> .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro",
>>>  true)));
>>> 
>>> Thanks
>>> Tim
>>> 
>>> 
>>>> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia  
>>>> wrote:
>>>> Hi Folks,
>>>> 
>>>> I have a strange situation while running beam 2.7.0 with the FlinkRunner, 
>>>> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its 
>>>> checkpoint. Flink is able to correctly writes its checkpoint / savepoint 
>>>> to HDFS without any problems.
>>>> 
>>>> However, my pipeline has to write to HDFS as well, but fails with "Caused 
>>>> by: java.lang.IllegalArgumentException: No filesystem found for scheme 
>>>> hdfs"
>>>> (stacktrace at the bottom)
>>>> 
>>>> In the host where the pipeline is running:
>>>> 1. The environment variable HADOOP_CONF_DIR is set.
>>>> 2. During my pipeline construction i am explicitly calling 
>>>> FileSystems.setDefaultPipelineOptions(_options); to trigger the 
>>>> ServiceLoader to find all options registrar from the classpath
>>>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems 
>>>> in my main method using reflection i am able to see that at launch time it 
>>>> contains:
>>>>{file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff, 
>>>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>>> 
>>>> Any idea what i am doing wrong with the HDFS integration?
>>>>
>>>> {snippet}
>>>> 
>>>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>>> Field f = 
>>>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>>> f.setAccessible(true);
>>>> AtomicReference> value 
>>>> = (AtomicReference>) f.get(null);
>>>> 
>>>> System.out.println("

Re: No filesystem found for scheme hdfs - with the FlinkRunner

2018-10-26 Thread Tim Robertson
Hi Juan

This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277
which we believed fixed in 2.7.0.
What IO are you using to write your files and can you paste a snippet of
your code please?

On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
workaround too):

 transform.apply("Write",
AvroIO.writeGenericRecords(schema)
.to(FileSystems.matchNewResource(options.getTarget(),true))
// BEAM-2277 workaround
.withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro",
true)));


Thanks
Tim


On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia 
wrote:

> Hi Folks,
>
> I have a strange situation while running beam 2.7.0 with the FlinkRunner,
> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
> checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
> HDFS without any problems.
>
> However, my pipeline has to write to HDFS as well, but fails with "Caused
> by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
> (stacktrace at the bottom)
>
> In the host where the pipeline is running:
> 1. The environment variable HADOOP_CONF_DIR is set.
> 2. During my pipeline construction i am explicitly calling
> FileSystems.setDefaultPipelineOptions(_options); to trigger the
> ServiceLoader to find all options registrar from the classpath
> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems
> in my main method using reflection i am able to see that at launch time it
> contains:
>{file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>
> Any idea what i am doing wrong with the HDFS integration?
>
> {snippet}
>
> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
> Field f =
> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
> f.setAccessible(true);
> AtomicReference> value
> = (AtomicReference>) f.get(null);
>
> System.out.println("===");
> System.out.println(value);
> {snippet}
>
> {stacktrace}
> Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme hdfs
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
> at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
> at
> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
> at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
> at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
> at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
> at
> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
> at
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>
> {stacktrace}
>
> --
>
> JC
>
>
>
> --
>
> JC
>
>


Re: ElasticIO retry configuration exception

2018-10-12 Thread Tim
Great! Thank you.

Feel free to add me as reviewer if you open a PR.

Tim

> On 12 Oct 2018, at 08:28, Wout Scheepers  
> wrote:
> 
> Hey Tim, Romain,
>  
> I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my 
> first PR.
> First will focus on getting a reproducible in a unit test.
> 
> Thanks!
> Wout
> 
> 
>  
> From: Tim Robertson 
> Reply-To: "user@beam.apache.org" 
> Date: Thursday, 11 October 2018 at 20:25
> To: "user@beam.apache.org" 
> Subject: Re: ElasticIO retry configuration exception
>  
> I took a super quick look at the code and I think Romain is correct.
>  
> 1. On a retry scenario it calls handleRetry()
> 2. Within handleRetry() it gets the DefaultRetryPredicate and calls 
> test(response) - this reads the response stream to JSON
> 3. When the retry is successful (no 429 code) the response is returned
> 4. The response is then passed in to checkForErrors(...)
> 5. This then tried to parse the response by reading the response stream. It 
> was already read in step 2 
>  
> Can you please open a Jira for this Wout? 
> https://issues.apache.org/jira/projects/BEAM/issues
> If you don't have an account I'll create it.
>  
> This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so 
> before released in 2.9.0. 
> However as soon as it is fixed it is fairly easy to bring into your own 
> project, by copying in the single ElasticsearchIO.java declared in the same 
> package.
>  
> Thank you for reporting the issue,
> Tim
>  
>  
>  
>  
> On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau  
> wrote:
> It looks more like a client issue where the stream is already read, maybe 
> give a try to reproduce it in a unit test in beam ES module? This will enable 
> us to help you more accurately.
> 
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>  
>  
> Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers 
>  a écrit :
> Hey Romain,
>  
> I’ve check and am using the same http client as beam 2.7.0.
> Just to be sure, I’ve created a minimal reproducible with a fresh project 
> with only the following dependencies in my build.gradle:
> dependencies {
> compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
> compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
> compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
> compile 
> ('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')
> 
> 
> testCompile 'org.hamcrest:hamcrest-all:1.3'
> testCompile 'org.assertj:assertj-core:3.4.1'
> testCompile 'junit:junit:4.12'
> }
>  
> However, the problem still persists when writing a document to elastic with 
> the retryConfiguration set.
> I guess the problem lies at my elastic version, as JB implies?
>  
> Anyway, thanks for the suggestion.
>  
> Wout
>  
> From: Romain Manni-Bucau 
> Reply-To: "user@beam.apache.org" 
> Date: Wednesday, 10 October 2018 at 16:53
> To: "user@beam.apache.org" 
> Subject: Re: ElasticIO retry configuration exception
>  
> Hi Wout,
>  
> Maye check your classpath http client versions (against 
> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
>  for instance).
> 
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>  
>  
> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
>  a écrit :
> Hey JB,
> 
> Thanks for your fast reply.
> The elastic version we're using is 5.6.2.
> 
> "version": {
> "number": "5.6.2",
> "build_hash": "57e20f3",
> "build_date": "2017-09-23T13:16:45.703Z",
> "build_snapshot": false,
> "lucene_version": "6.6.1"
> }
> 
> 
> Wout
> 
> 
> 
> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
> 
> Hi Wout,
> 
> what's the elasticsearch version ? (just to try to reproduce)
> 
> Thanks,
> Regards
> JB
> 
> On 10/10/2018 15:31, Wout Scheepers wrote:
> > Hey all,
> > 
> >  
> > 
> > When using .withRetryConfi

Re: ElasticIO retry configuration exception

2018-10-11 Thread Tim Robertson
I took a super quick look at the code and I think Romain is correct.

1. On a retry scenario it calls handleRetry()
2. Within handleRetry() it gets the DefaultRetryPredicate and calls
test(response) - this reads the response stream to JSON
3. When the retry is successful (no 429 code) the response is returned
4. The response is then passed in to checkForErrors(...)
5. This then tried to parse the response by reading the response stream. It
was already read in step 2

Can you please open a Jira for this Wout?
https://issues.apache.org/jira/projects/BEAM/issues
If you don't have an account I'll create it.

This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so
before released in 2.9.0.
However as soon as it is fixed it is fairly easy to bring into your own
project, by copying in the single ElasticsearchIO.java declared in the same
package.

Thank you for reporting the issue,
Tim




On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau 
wrote:

> It looks more like a client issue where the stream is already read, maybe
> give a try to reproduce it in a unit test in beam ES module? This will
> enable us to help you more accurately.
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
>
> Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers <
> wout.scheep...@vente-exclusive.com> a écrit :
>
>> Hey Romain,
>>
>>
>>
>> I’ve check and am using the same http client as beam 2.7.0.
>>
>> Just to be sure, I’ve created a minimal reproducible with a fresh project 
>> with only the following dependencies in my build.gradle:
>> dependencies {
>> compile (*'org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0'*)
>> compile (*'org.apache.beam:beam-runners-direct-java:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0'*)
>> compile (*'org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0'*)
>> compile (*'org.apache.beam:beam-sdks-java-io-common:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0'*)
>> compile (*'org.apache.beam:beam-sdks-java-io-jdbc:2.7.0'*)
>>
>>
>> testCompile
>> *'org.hamcrest:hamcrest-all:1.3'*testCompile
>> *'org.assertj:assertj-core:3.4.1'*testCompile
>> *'junit:junit:4.12'*}
>>
>>
>>
>> However, the problem still persists when writing a document to elastic
>> with the retryConfiguration set.
>>
>> I guess the problem lies at my elastic version, as JB implies?
>>
>>
>>
>> Anyway, thanks for the suggestion.
>>
>>
>>
>> Wout
>>
>>
>>
>> *From: *Romain Manni-Bucau 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Wednesday, 10 October 2018 at 16:53
>> *To: *"user@beam.apache.org" 
>> *Subject: *Re: ElasticIO retry configuration exception
>>
>>
>>
>> Hi Wout,
>>
>>
>>
>> Maye check your classpath http client versions (against
>> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
>> for instance).
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>>
>>
>>
>>
>> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers <
>> wout.scheep...@vente-exclusive.com> a écrit :
>>
>> Hey JB,
>>
>> Thanks for your fast reply.
>> The elastic version we're using is 5.6.2.
>>
>> "version": {
>> "number": "5.6.2",
>> "build_hash": "57e20f3",
>> "build_date": "2017-09-23T13:16:45.703Z",
>> "build_snapshot": false,
>> "lucene_version": "6.6.1"
>> }

Re: AvroIO - failure using direct runner with java.nio.file.FileAlreadyExistsException when moving from temp to destination

2018-09-26 Thread Tim Robertson
Hi Juan

Well done for diagnosing your issue and thank you for taking the time to
report it here.

I'm not the author of this section but I've taken a quick look at the code
and in line comments and have some observations which I think might help
explain it.

I notice it writes into temporary files and uses a HashMap for maintaining a pool of writers for each destination. I presume
that you are receiving a new instance of the DestinationT object on each
call and therefore the HashMap will be treating these as separate entries -
a new writer is created for each entry in the hashMap..  The method
responsible for providing the DestinationT is the following from the
FileBasedSink which does document the expectation:

/**
 * Returns an object that represents at a high level the destination
being written to. May not
 * return null. A destination must have deterministic hash and
equality methods defined.
 */
public abstract DestinationT getDestination(UserT element);


Beyond that I notice that it also relies on using the a hashCode from the
serialised object (i.e. after running through the coder) which you note
too. The inline doc explains the reasoning for that which is because
hashCode is not guaranteed to be stable across machines. When elements are
processed on different machines we need deterministic behaviour to direct
to the correct target shard. To do that the code opts to use a murmur3_32
algorithm which is safe across machines (Today I learnt!) and it operates
on the encoded bytes for the object which are to be deterministic.

I agree that we should improve the documentation and state that hashCode
and equals needs to be implemented when user defined objects are used for
the dynamic destination. Would you mind opening a Jira for that please?

I hope this helps a little, and thanks again
Tim











On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia 
wrote:

> Hi Guys, after days of bumping my head against the monitor i found why it
> was not working.
>
> One key element when using *DynamicAvroDestinations *that is not
> described in the documentation is that, if you are using a regular POJO as
> *DestinationT* like i am (and not Long/String/Integer as the example) :
>
> {code}
>DynamicAvroDestinations GenericRecord>
> {code}
>
> Its very important to pay attention to equals / hashCode implementations,
> which should aligned with your sharding/grouping/partition structure. Not
> doing so will give you the result i described earlier (1 file (or shard)
> with 1 record only, or sometime just an exception).
>
> While i still don't understand why it depends on equals / hashCode, as i
> checked the class on:
>   *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*
>
> The hashing depends on the Coder itself (method:  int
> hashDestination(DestinationT destination, Coder
> destinationCoder)).
>
> Maybe a core member could explain the reason of it, or its an unexpected
> behavior and there is a bug somewhere else.
>
> In my case below you can find my POJO Destination along with the
> corresponding Codec implementation, which works correctly as long as the
> equals / hashCode are implemented:
>
> {code}
> static class GenericRecordDynamicDestination {
> private String logicalType;
> private final int year;
> private final int month;
> private final int day;
>
> public GenericRecordDynamicDestination(final String _logicalType,
> final int _year, final int _month, final int _day) {
> logicalType = _logicalType;
> year = _year;
> month = _month;
> day = _day;
> }
>
> public String getLogicalType() {
> return logicalType;
> }
>
> public void setLogicalType(final String _logicalType) {
> logicalType = _logicalType;
> }
>
> public int getYear() {
> return year;
> }
>
> public int getMonth() {
> return month;
> }
>
> public int getDay() {
> return day;
> }
>
> @Override
> public boolean equals(final Object _o) {
> if (this == _o) return true;
> if (_o == null || getClass() != _o.getClass()) return false;
>
> final GenericRecordDynamicDestination that =
> (GenericRecordDynamicDestination) _o;
>
> if (year != that.year) return false;
> if (month != that.month) return false;
> if (day != that.day) return false;
> return logicalType.equals(that.logicalType);
> }
>
> @Override
> public int hashCode() {
> int result = logicalType.hashCode();
> result = 31 * result + year;
&

Re: Regression of (BEAM-2277) - IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-20 Thread Tim Robertson
Hi Juan,

You are correct that BEAM-2277 seems to be recurring. I have today stumbled
upon that myself in my own pipeline (not word count).
I have just posted a workaround at the bottom of the issue, and will reopen
the issue.

Thank you for reminding us on this,
Tim


On Mon, Aug 20, 2018 at 4:44 PM Juan Carlos Garcia 
wrote:

> BUMP!!!
>
> There are people reporting the problem for BEAM 2.6.0
> <https://issues.apache.org/jira/browse/BEAM-2277?focusedCommentId=16585979=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16585979>,
> any CORE dev out there?
>
> :)
>
>
>
>
>
> On Mon, Jul 30, 2018 at 3:25 PM Juan Carlos Garcia 
> wrote:
>
>> Hi Folks,
>>
>> I experienced the issued described in (BEAM-2277
>> <https://issues.apache.org/jira/browse/BEAM-2277>), which shows it was
>> fixed by v2.0.0
>>
>> However using version 2.4.0 and 2.6.0 (another user reported it) shows
>> the same error.
>>
>> So either it was not 100% fixed, or the bug appeared again.
>>
>> Thanks and Regards
>> --
>>
>> JC
>>
>>
>
> --
>
> JC
>
>


Re: ElasticsearchIO bulk delete

2018-07-30 Thread Tim Robertson
> we decided to postpone the feature

That makes sense.

I believe the ES6 branch is in-part working (I've looked at the code but
not used it) which you can see here [1] and the jira to watch or contribute
is [2]. It would be a useful addition to test independently and report any
observations or improvement requests on that jira.

The offer to assist in your first PR remains open for the future - please
don't hesitate to ask.

Thanks,
Tim

[1]
https://github.com/jsteggink/beam/tree/BEAM-3199/sdks/java/io/elasticsearch-6/src/main/java/org/apache/beam/sdk/io/elasticsearch
[2] https://issues.apache.org/jira/browse/BEAM-3199

On Mon, Jul 30, 2018 at 10:55 AM, Wout Scheepers <
wout.scheep...@vente-exclusive.com> wrote:

> Hey Tim,
>
>
>
> Thanks for your proposal to mentor me through my first PR.
>
> As we’re definitely planning to upgrade to ES6 when Beam supports it, we
> decided to postpone the feature (we have a fix that works for us, for now).
>
> When Beam supports ES6, I’ll be happy to make a contribution to get bulk
> deletes working.
>
>
>
> For reference, I opened a ticket (https://issues.apache.org/
> jira/browse/BEAM-5042).
>
>
>
> Cheers,
>
> Wout
>
>
>
>
>
> *From: *Tim Robertson 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Friday, 27 July 2018 at 17:43
> *To: *"user@beam.apache.org" 
> *Subject: *Re: ElasticsearchIO bulk delete
>
>
>
> Hi Wout,
>
>
>
> This is great, thank you. I wrote the partial update support you reference
> and I'll be happy to mentor you through your first PR - welcome aboard. Can
> you please open a Jira to reference this work and we'll assign it to you?
>
>
>
> We discussed having the "_xxx" fields in the document and triggering
> actions based on that in the partial update jira but opted to avoid
> it. Based on that discussion the ActionFn would likely be the preferred
> approach.  Would that be possible?
>
>
>
> It will be important to provide unit and integration tests as well.
>
>
>
> Please be aware that there is a branch and work underway for ES6 already
> which is rather different on the write() path so this may become redundant
> rather quickly.
>
>
>
> Thanks,
>
> Tim
>
>
>
> @timrobertson100 on the Beam slack channel
>
>
>
>
>
>
>
> On Fri, Jul 27, 2018 at 2:53 PM, Wout Scheepers  exclusive.com> wrote:
>
> Hey all,
>
>
>
> A while ago, I patched ElasticsearchIO to be able to do partial updates
> and deletes.
>
> However, I did not consider my patch pull-request-worthy as the json
> parsing was done inefficient (parsed it twice per document).
>
>
>
> Since Beam 2.5.0 partial updates are supported, so the only thing I’m
> missing is the ability to send bulk *delete* requests.
>
> We’re using entity updates for event sourcing in our data lake and need to
> persist deleted entities in elastic.
>
> We’ve been using my patch in production for the last year, but I would
> like to contribute to get the functionality we need into one of the next
> releases.
>
>
>
> I’ve created a gist that works for me, but is still inefficient (parsing
> twice: once to check the ‘_action` field, once to get the metadata).
>
> Each document I want to delete needs an additional ‘_action’ field with
> the value ‘delete’. It doesn’t matter the document still contains the
> redundant field, as the delete action only requires the metadata.
>
> I’ve added the method isDelete() and made some changes to the
> processElement() method.
>
> https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9
>
>
>
> I would like to make my solution more generic to fit into the current
> ElasticsearchIO and create a proper pull request.
>
> As this would be my first pull request for beam, can anyone point me in
> the right direction before I spent too much time creating something that
> will be rejected?
>
>
>
> Some questions on the top of my mind are:
>
>- Is it a good idea it to make the ‘action’ part for the bulk api
>generic?
>- Should it be even more generic? (e.g.: set an ‘ActionFn’ on the
>ElasticsearchIO)
>- If I want to avoid parsing twice, the parsing should be done outside
>of the getDocumentMetaData() method. Would this be acceptable?
>- Is it possible to avoid passing the action as a field in the
>document?
>- Is there another or better way to get the delete functionality in
>general?
>
>
>
> All feedback is more than welcome.
>
>
> Cheers,
> Wout
>
>
>
>
>
>
>


Re: ElasticsearchIO bulk delete

2018-07-27 Thread Tim Robertson
Hi Wout,

This is great, thank you. I wrote the partial update support you reference
and I'll be happy to mentor you through your first PR - welcome aboard. Can
you please open a Jira to reference this work and we'll assign it to you?

We discussed having the "_xxx" fields in the document and triggering
actions based on that in the partial update jira but opted to avoid
it. Based on that discussion the ActionFn would likely be the preferred
approach.  Would that be possible?

It will be important to provide unit and integration tests as well.

Please be aware that there is a branch and work underway for ES6 already
which is rather different on the write() path so this may become redundant
rather quickly.

Thanks,
Tim

@timrobertson100 on the Beam slack channel



On Fri, Jul 27, 2018 at 2:53 PM, Wout Scheepers <
wout.scheep...@vente-exclusive.com> wrote:

> Hey all,
>
>
>
> A while ago, I patched ElasticsearchIO to be able to do partial updates
> and deletes.
>
> However, I did not consider my patch pull-request-worthy as the json
> parsing was done inefficient (parsed it twice per document).
>
>
>
> Since Beam 2.5.0 partial updates are supported, so the only thing I’m
> missing is the ability to send bulk *delete* requests.
>
> We’re using entity updates for event sourcing in our data lake and need to
> persist deleted entities in elastic.
>
> We’ve been using my patch in production for the last year, but I would
> like to contribute to get the functionality we need into one of the next
> releases.
>
>
>
> I’ve created a gist that works for me, but is still inefficient (parsing
> twice: once to check the ‘_action` field, once to get the metadata).
>
> Each document I want to delete needs an additional ‘_action’ field with
> the value ‘delete’. It doesn’t matter the document still contains the
> redundant field, as the delete action only requires the metadata.
>
> I’ve added the method isDelete() and made some changes to the
> processElement() method.
>
> https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9
>
>
>
> I would like to make my solution more generic to fit into the current
> ElasticsearchIO and create a proper pull request.
>
> As this would be my first pull request for beam, can anyone point me in
> the right direction before I spent too much time creating something that
> will be rejected?
>
>
>
> Some questions on the top of my mind are:
>
>- Is it a good idea it to make the ‘action’ part for the bulk api
>generic?
>- Should it be even more generic? (e.g.: set an ‘ActionFn’ on the
>ElasticsearchIO)
>- If I want to avoid parsing twice, the parsing should be done outside
>of the getDocumentMetaData() method. Would this be acceptable?
>- Is it possible to avoid passing the action as a field in the
>document?
>- Is there another or better way to get the delete functionality in
>general?
>
>
>
> All feedback is more than welcome.
>
>
> Cheers,
> Wout
>
>
>
>
>


Re: Pipeline error handling

2018-07-26 Thread Tim Robertson
Hi Kelsey

Does the example [1] in the docs demonstrate differing generic types when
using withOutputTags()?

Could something like the following work for you?

  final TupleTag type1Records =
  final TupleTag type2Records =
  final TupleTag invalidRecords =  // CSVInvalidLine holds
e.g. an ID and a cause

HTH,
Tim


[1]
https://beam.apache.org/documentation/programming-guide/#additional-outputs

On Thu, Jul 26, 2018 at 9:44 AM, Kelsey RIDER  wrote:

> I’m trying to figure out how to handle errors in my Pipeline.
>
>
>
> Right now, my main transform is a DoFn. I
> have a few different TupleTag that I use depending on the data
> contained in the records.
>
> In the event there’s a problem with a line (due to one of several possible
> causes), I created a TupleTag ERROR. However, just doing this
> doesn’t carry with it any information about the error.
>
> I would like for the ERROR tag to have a type other than CSVRecord, e.g.
> some sort of ErrorInfo class containing the row number, filename, message
> about what went wrong, etc…
>
>
>
> I can’t use multiple TupleTag types with ParDo, because the
> withOutputTags() method forces them to all have the same generic parameter.
>
>
>
> I saw the example here: https://medium.com/@vallerylancey/error-handling-
> elements-in-apache-beam-pipelines-fffdea91af2a
>
> But I don’t see how this can work, since they use multiple generic types
> in withOutputTags(). (And is this good practice? Seems like they “cheat” by
> not calling apply(), instead directly transforming the PCollection (and why
> even bother extending DoFn in this case?).)
>
>
>
> Finally, if I write my own PTransform PCollectionTuple> class, and start manually creating PCollections and
> whatnot…then this would effectively become a bottleneck where everything
> has to be read at once, and there’s no longer any sequential handling of
> the records as they’re read, right?
> Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.
>


Re: Beam and Hive

2018-07-20 Thread Tim Robertson
I answered on SO Kelsey,

You should be able to add this I believe to explicitly declare the coder to
use:

p.getCoderRegistry()
  .registerCoderForClass(HCatRecord.class,
WritableCoder.of(DefaultHCatRecord.class));



On Fri, Jul 20, 2018 at 5:05 PM, Kelsey RIDER  wrote:

> Hello,
>
>
>
> I wrote it all in this SO post: https://stackoverflow.com/
> questions/51443966/simple-hive-write-not-working
>
>
>
> Since then, I’ve started looking at writing my own Coder to handle the
> serialization of HCatRecords. But shouldn’t this be included?
> Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.
>


withHotKeyFanout Question

2018-02-16 Thread Tim Ross
My pipeline utilizes the Combine.perKey transform and I would like to add 
withHotKeyFanout to prevent the combine from being a bottleneck. To test I made 
sure that my mergeAccumulators function was correct and added 
withHotKeyFanout(2) to my Combine transform.
When I launch the pipeline with flink(v1.3.2) the pipeline only lasts for a 
minute or so until I am greeted with this stacktrace:

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
...7 more
Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
... 7 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
... 15 more
Caused by: org.apache.avro.AvroRuntimeException: Array data must be a 
Collection or Array
at 
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:70)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:156)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:64)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:879)
at 

Re: Windowing question

2018-01-24 Thread Tim Ross
Yes that is what the pipeline I came up with looks like. However the next step 
in the pipeline is new/expired logic. I have tried a variety of things but none 
have gotten me close to what I want. Hence my questioning to this mailing list.


Thanks,
Tim
From: Kenneth Knowles <k...@google.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Wednesday, January 24, 2018 at 2:21 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Windowing question

A first approach would be to just not translate any of the new/expired logic. 
Beam does have the concept of expiring a window, though it is true that only 
particular transformations actually drop expired data. Have you tried something 
along the lines of this?

pipeline.begin()
.apply(FooIO.read(...))

.apply(Window.into(SlidingWindows.of(Duration.standardMinutes(5)).every(Duration.standardSeconds(30
.apply(ParDo.of(new DoFn<>() { ... get into proper format ... })
... the rest of the logic ...

Just a very vague sketch of what, actually, most pipelines look like.

Kenn

On Wed, Jan 24, 2018 at 11:07 AM, Tim Ross 
<tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote:
I am trying to convert an existing Apache Storm Bolt into an Apache Beam 
pipeline. The storm bolt used sliding windows with a duration of 5 minute and a 
period of 30 seconds. After doing some initial transforms to get data in the 
proper format it would process all elements which were new or expired.

Since Beam doesn’t have the concept of new and expired data in a window I’m 
trying to figure out how one would accomplish this.


Thanks,
Tim
From: Kenneth Knowles <k...@google.com<mailto:k...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, January 24, 2018 at 1:45 PM

To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Windowing question

Generally, Beam will discard expired data for you (including state). Can you 
describe more? What is your windowing strategy? What are the edge triggers?

On Wed, Jan 24, 2018 at 10:37 AM, Tim Ross 
<tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote:
I am just trying to do certain processing on edge triggers, i.e. new or expired 
data, to reduce the overall processing of a very large stream.

How would I go about doing that with state? As I understand it, state is tied 
to key and window.

Thanks,
Tim
From: Kenneth Knowles <k...@google.com<mailto:k...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, January 24, 2018 at 1:25 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Windowing question

A little clarification: in Beam an element exists in a single window, 
mathematically speaking. So when you use SlidingWindows, for example, to assign 
multiple windows this "copies" the value for each window, and that is how you 
should think of it, from a calculation point of view. Under the hood, a 
compressed representation is often used, but not in all situations.

Kenn

On Wed, Jan 24, 2018 at 9:45 AM, Robert Bradshaw 
<rober...@google.com<mailto:rober...@google.com>> wrote:
No, Apache Beam doesn't offer this explicitly. You could accomplish it
using State, but perhaps if you clarified what you were trying to
accomplish by using these mechanisms there'd be another way to do the
same thing.

On Wed, Jan 24, 2018 at 7:03 AM, Tim Ross 
<tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote:
> Is there anything comparable to Apache Storm’s Window.getNew and
> Window.getExpired in Apache Beam?  How would I determine if an element is
> new or expired in consecutive windows?
>
>
>
> Thanks,
>
> Tim
>
> This e-mail message and any attachments to it are intended only for the
> named recipients and may contain legally privileged and/or confidential
> information. If you are not one of the intended recipients, do not duplicate
> or forward this e-mail message.


This e-mail message and any attachments to it are intended only for the named 
recipients and may contain legally privileged and/or confidential information. 
If you are not one of the intended recipients, do not duplicate or forward this 
e-mail message.


This e-mail message and any attachments to it are intended only for the named 
recipients and may contain legally privileged and/or confidential information. 
If you are not one of the intended recipients, do not duplicate o

Re: Windowing question

2018-01-24 Thread Tim Ross
I am trying to convert an existing Apache Storm Bolt into an Apache Beam 
pipeline. The storm bolt used sliding windows with a duration of 5 minute and a 
period of 30 seconds. After doing some initial transforms to get data in the 
proper format it would process all elements which were new or expired.

Since Beam doesn’t have the concept of new and expired data in a window I’m 
trying to figure out how one would accomplish this.


Thanks,
Tim
From: Kenneth Knowles <k...@google.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Wednesday, January 24, 2018 at 1:45 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Windowing question

Generally, Beam will discard expired data for you (including state). Can you 
describe more? What is your windowing strategy? What are the edge triggers?

On Wed, Jan 24, 2018 at 10:37 AM, Tim Ross 
<tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote:
I am just trying to do certain processing on edge triggers, i.e. new or expired 
data, to reduce the overall processing of a very large stream.

How would I go about doing that with state? As I understand it, state is tied 
to key and window.

Thanks,
Tim
From: Kenneth Knowles <k...@google.com<mailto:k...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, January 24, 2018 at 1:25 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Windowing question

A little clarification: in Beam an element exists in a single window, 
mathematically speaking. So when you use SlidingWindows, for example, to assign 
multiple windows this "copies" the value for each window, and that is how you 
should think of it, from a calculation point of view. Under the hood, a 
compressed representation is often used, but not in all situations.

Kenn

On Wed, Jan 24, 2018 at 9:45 AM, Robert Bradshaw 
<rober...@google.com<mailto:rober...@google.com>> wrote:
No, Apache Beam doesn't offer this explicitly. You could accomplish it
using State, but perhaps if you clarified what you were trying to
accomplish by using these mechanisms there'd be another way to do the
same thing.

On Wed, Jan 24, 2018 at 7:03 AM, Tim Ross 
<tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote:
> Is there anything comparable to Apache Storm’s Window.getNew and
> Window.getExpired in Apache Beam?  How would I determine if an element is
> new or expired in consecutive windows?
>
>
>
> Thanks,
>
> Tim
>
> This e-mail message and any attachments to it are intended only for the
> named recipients and may contain legally privileged and/or confidential
> information. If you are not one of the intended recipients, do not duplicate
> or forward this e-mail message.


This e-mail message and any attachments to it are intended only for the named 
recipients and may contain legally privileged and/or confidential information. 
If you are not one of the intended recipients, do not duplicate or forward this 
e-mail message.


This e-mail message and any attachments to it are intended only for the named 
recipients and may contain legally privileged and/or confidential information. 
If you are not one of the intended recipients, do not duplicate or forward this 
e-mail message.


Re: Windowing question

2018-01-24 Thread Tim Ross
I am just trying to do certain processing on edge triggers, i.e. new or expired 
data, to reduce the overall processing of a very large stream.

How would I go about doing that with state? As I understand it, state is tied 
to key and window.

Thanks,
Tim
From: Kenneth Knowles <k...@google.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Wednesday, January 24, 2018 at 1:25 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Windowing question

A little clarification: in Beam an element exists in a single window, 
mathematically speaking. So when you use SlidingWindows, for example, to assign 
multiple windows this "copies" the value for each window, and that is how you 
should think of it, from a calculation point of view. Under the hood, a 
compressed representation is often used, but not in all situations.

Kenn

On Wed, Jan 24, 2018 at 9:45 AM, Robert Bradshaw 
<rober...@google.com<mailto:rober...@google.com>> wrote:
No, Apache Beam doesn't offer this explicitly. You could accomplish it
using State, but perhaps if you clarified what you were trying to
accomplish by using these mechanisms there'd be another way to do the
same thing.

On Wed, Jan 24, 2018 at 7:03 AM, Tim Ross 
<tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote:
> Is there anything comparable to Apache Storm’s Window.getNew and
> Window.getExpired in Apache Beam?  How would I determine if an element is
> new or expired in consecutive windows?
>
>
>
> Thanks,
>
> Tim
>
> This e-mail message and any attachments to it are intended only for the
> named recipients and may contain legally privileged and/or confidential
> information. If you are not one of the intended recipients, do not duplicate
> or forward this e-mail message.


This e-mail message and any attachments to it are intended only for the named 
recipients and may contain legally privileged and/or confidential information. 
If you are not one of the intended recipients, do not duplicate or forward this 
e-mail message.


Windowing question

2018-01-24 Thread Tim Ross
Is there anything comparable to Apache Storm’s Window.getNew and 
Window.getExpired in Apache Beam?  How would I determine if an element is new 
or expired in consecutive windows?

Thanks,
Tim

This e-mail message and any attachments to it are intended only for the named 
recipients and may contain legally privileged and/or confidential information. 
If you are not one of the intended recipients, do not duplicate or forward this 
e-mail message.


Re: [DISCUSS] Drop Spark 1.x support to focus on Spark 2.x

2017-11-20 Thread Tim
Thanks JB

From which release will Spark 1.x be dropped please? Is this slated for 2.3.0 
at the end of the year?

Thanks,
Tim,
Sent from my iPhone

> On 20 Nov 2017, at 21:21, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> 
> Hi,
> 
> it seems we have a consensus to upgrade to Spark 2.x, dropping Spark 1.x. I 
> will upgrade the PR accordingly.
> 
> Thanks all for your input and feedback.
> 
> Regards
> JB
> 
>> On 11/13/2017 09:32 AM, Jean-Baptiste Onofré wrote:
>> Hi Beamers,
>> I'm forwarding this discussion & vote from the dev mailing list to the user 
>> mailing list.
>> The goal is to have your feedback as user.
>> Basically, we have two options:
>> 1. Right now, in the PR, we support both Spark 1.x and 2.x using three 
>> artifacts (common, spark1, spark2). You, as users, pick up spark1 or spark2 
>> in your dependencies set depending the Spark target version you want.
>> 2. The other option is to upgrade and focus on Spark 2.x in Beam 2.3.0. If 
>> you still want to use Spark 1.x, then, you will be stuck up to Beam 2.2.0.
>> Thoughts ?
>> Thanks !
>> Regards
>> JB
>>  Forwarded Message 
>> Subject: [VOTE] Drop Spark 1.x support to focus on Spark 2.x
>> Date: Wed, 8 Nov 2017 08:27:58 +0100
>> From: Jean-Baptiste Onofré <j...@nanthrax.net>
>> Reply-To: d...@beam.apache.org
>> To: d...@beam.apache.org
>> Hi all,
>> as you might know, we are working on Spark 2.x support in the Spark runner.
>> I'm working on a PR about that:
>> https://github.com/apache/beam/pull/3808
>> Today, we have something working with both Spark 1.x and 2.x from a code 
>> standpoint, but I have to deal with dependencies. It's the first step of the 
>> update as I'm still using RDD, the second step would be to support dataframe 
>> (but for that, I would need PCollection elements with schemas, that's 
>> another topic on which Eugene, Reuven and I are discussing).
>> However, as all major distributions now ship Spark 2.x, I don't think it's 
>> required anymore to support Spark 1.x.
>> If we agree, I will update and cleanup the PR to only support and focus on 
>> Spark 2.x.
>> So, that's why I'm calling for a vote:
>>   [ ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only
>>   [ ] 0 (I don't care ;))
>>   [ ] -1, I would like to still support Spark 1.x, and so having support of 
>> both Spark 1.x and 2.x (please provide specific comment)
>> This vote is open for 48 hours (I have the commits ready, just waiting the 
>> end of the vote to push on the PR).
>> Thanks !
>> Regards
>> JB
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Tim Robertson
Hi Chet,

I'll be a user of this, so thank you.

It seems reasonable although - did you consider letting folk name the
document ID field explicitly?  It would avoid an unnecessary transformation
and might be simpler:


  // instruct the writer to use a provided document ID

  
ElasticsearchIO.write().withConnectionConfiguration(conn).withMaxBatchSize(BATCH_SIZE).withDocumentIdField("myID");


On Wed, Nov 15, 2017 at 8:08 PM, Chet Aldrich 
wrote:

> Given that this seems like a change that should probably happen, and I’d
> like to help contribute if possible, a few questions and my current
> opinion:
>
> So I’m leaning towards approach B here, which is:
>
> b. (a bit less user friendly) PCollection with K as an id. But forces
> the user to do a Pardo before writing to ES to output KV pairs of 
>
> I think that the reduction in user-friendliness may be outweighed by the
> fact that this obviates some of the issues surrounding a failure when
> finishing a bundle. Additionally, this *forces* the user to provide a
> document id, which I think is probably better practice. This will also
> probably lead to fewer frustrations around “magic” code that just pulls
> something in if it happens to be there, and doesn’t if not. We’ll need to
> rely on the user catching this functionality in the docs or the code itself
> to take advantage of it.
> IMHO it’d be generally better to enforce this at compile time because it
> does have an effect on whether the pipeline produces duplicates on failure.
> Additionally, we get the benefit of relatively intuitive behavior where if
> the user passes in the same Key value, it’ll update a record in ES, and if
> the key is different then it will create a new record.
>
> Curious to hear thoughts on this. If this seems reasonable I’ll go ahead
> and create a JIRA for tracking and start working on a PR for this. Also, if
> it’d be good to loop in the dev mailing list before starting let me know,
> I’m pretty new to this.
>
> Chet
>
> On Nov 15, 2017, at 12:53 AM, Etienne Chauchot 
> wrote:
>
> Hi Chet,
>
> What you say is totally true, docs written using ElasticSearchIO will
> always have an ES generated id. But it might change in the future, indeed
> it might be a good thing to allow the user to pass an id. Just in 5 seconds
> thinking, I see 3 possible designs for that.
>
> a.(simplest) use a json special field for the id, if it is provided by the
> user in the input json then it is used, auto-generated id otherwise.
>
> b. (a bit less user friendly) PCollection with K as an id. But forces
> the user to do a Pardo before writing to ES to output KV pairs of 
>
> c. (a lot more complex) Allow the IO to serialize/deserialize java beans
> and have an String id field. Matching java types to ES types is quite
> tricky, so, for now we just relied on the user to serialize his beans into
> json and let ES match the types automatically.
>
> Related to the problems you raise bellow:
>
> 1. Well, the bundle is the commit entity of beam. Consider the case of
> ESIO.batchSize being < to bundle size. While processing records, when the
> number of elements reaches batchSize, an ES bulk insert will be issued but
> no finishBundle. If there is a problem later on in the bundle processing
> before the finishBundle, the checkpoint will still be at the beginning of
> the bundle, so all the bundle will be retried leading to duplicate
> documents. Thanks for raising that! I'm CCing the dev list so that someone
> could correct me on the checkpointing mecanism if I'm missing something.
> Besides I'm thinking about forcing the user to provide an id in all cases
> to workaround this issue.
> 2. Correct.
>
> Best,
> Etienne
>
> Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
>
> Hello all!
>
> So I’ve been using the ElasticSearchIO sink for a project (unfortunately
> it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC)
> and I’m finding that it doesn’t allow for changing the document ID, but
> only lets you pass in a record, which means that the document ID is
> auto-generated. See this line for what specifically is happening:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/
> elasticsearch/src/main/java/org/apache/beam/sdk/io/
> elasticsearch/ElasticsearchIO.java#L838
>
> Essentially the data part of the document is being placed but it doesn’t
> allow for other properties, such as the document ID, to be set.
>
> This leads to two problems:
>
> 1. Beam doesn’t necessarily guarantee exactly-once execution for a given
> item in a PCollection, as I understand it. This means that you may get more
> than one record in Elastic for a given item in a PCollection that you pass
> in.
>
> 2. You can’t do partial updates to an index. If you run a batch job once,
> and then run the batch job again on the same index without clearing it, you
> just double everything in there.
>
> Is there any good way around this?
>
> I’d be 

Re: Does ElasticsearchIO in the latest RC support adding document IDs?

2017-11-15 Thread Tim Robertson
Hi Chet,

+1 for interest in this from me too.

If it helps, I'd have expected a) to be the implementation (e.g. something
like "_id" being used if present) and handing multiple delivery being a
responsibility of the developer.

Thanks,
Tim




On Wed, Nov 15, 2017 at 10:08 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> I think it's also related to the discussion Romain raised on the dev
> mailing list (gap between batch size, checkpointing & bundles).
>
> Regards
> JB
>
> On 11/15/2017 09:53 AM, Etienne Chauchot wrote:
>
>> Hi Chet,
>>
>> What you say is totally true, docs written using ElasticSearchIO will
>> always have an ES generated id. But it might change in the future, indeed
>> it might be a good thing to allow the user to pass an id. Just in 5 seconds
>> thinking, I see 3 possible designs for that.
>>
>> a.(simplest) use a json special field for the id, if it is provided by
>> the user in the input json then it is used, auto-generated id otherwise.
>>
>> b. (a bit less user friendly) PCollection with K as an id. But forces
>> the user to do a Pardo before writing to ES to output KV pairs of <id, json>
>>
>> c. (a lot more complex) Allow the IO to serialize/deserialize java beans
>> and have an String id field. Matching java types to ES types is quite
>> tricky, so, for now we just relied on the user to serialize his beans into
>> json and let ES match the types automatically.
>>
>> Related to the problems you raise bellow:
>>
>> 1. Well, the bundle is the commit entity of beam. Consider the case of
>> ESIO.batchSize being < to bundle size. While processing records, when the
>> number of elements reaches batchSize, an ES bulk insert will be issued but
>> no finishBundle. If there is a problem later on in the bundle processing
>> before the finishBundle, the checkpoint will still be at the beginning of
>> the bundle, so all the bundle will be retried leading to duplicate
>> documents. Thanks for raising that! I'm CCing the dev list so that someone
>> could correct me on the checkpointing mecanism if I'm missing something.
>> Besides I'm thinking about forcing the user to provide an id in all cases
>> to workaround this issue.
>>
>> 2. Correct.
>>
>> Best,
>> Etienne
>>
>> Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
>>
>>> Hello all!
>>>
>>> So I’ve been using the ElasticSearchIO sink for a project (unfortunately
>>> it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC)
>>> and I’m finding that it doesn’t allow for changing the document ID, but
>>> only lets you pass in a record, which means that the document ID is
>>> auto-generated. See this line for what specifically is happening:
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/elas
>>> ticsearch/src/main/java/org/apache/beam/sdk/io/elasticsear
>>> ch/ElasticsearchIO.java#L838
>>>
>>> Essentially the data part of the document is being placed but it doesn’t
>>> allow for other properties, such as the document ID, to be set.
>>>
>>> This leads to two problems:
>>>
>>> 1. Beam doesn’t necessarily guarantee exactly-once execution for a given
>>> item in a PCollection, as I understand it. This means that you may get more
>>> than one record in Elastic for a given item in a PCollection that you pass
>>> in.
>>>
>>> 2. You can’t do partial updates to an index. If you run a batch job
>>> once, and then run the batch job again on the same index without clearing
>>> it, you just double everything in there.
>>>
>>> Is there any good way around this?
>>>
>>> I’d be happy to try writing up a PR for this in theory, but not sure how
>>> to best approach it. Also would like to figure out a way to get around this
>>> in the meantime, if anyone has any ideas.
>>>
>>> Best,
>>>
>>> Chet
>>>
>>> P.S. CCed echauc...@gmail.com <mailto:echauc...@gmail.com> because it
>>> seems like he’s been doing work related to the elastic sink.
>>>
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: [DISCUSS] Drop Spark 1.x support to focus on Spark 2.x

2017-11-13 Thread Tim Robertson
Thanks JB

On "thoughts":

- Cloudera 5.13 will still default to 1.6 even though a 2.2 parcel is
available (HWX provides both)
- Cloudera support for spark 2 has a list of exceptions (
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html
)
  - I am not sure if the HBaseIO would be affected
  - I am not sure if structured streaming would have implications
  - it might stop customers from being able to run spark 2 at all due to
support agreements
- Spark 2.3 (EOY) will drop Scala 2.10 support
- IBM's now defunct distro only has 1.6
- Oozie doesn't have a spark 2 action (need to use a shell action)
- There are a lot of folks with code running on 1.3,1.4 and 1.5 still
- Spark 2.2+ requires Java 8 too, while <2.2 was J7 like Beam (not sure if
this has other implications for the cross deployment nature of Beam)

My first impressions of Beam was really favourable as it all just worked
first time on a CDH Spark 1.6 cluster.  For us it is lacking resources to
refactor legacy code which delays the 2.2 push.

With that said I think is it very reasonable to have a clear cut off in
Beam, especially if it limits progress / causes headaches in packaging,
robustness etc.  I'd recommend putting it in a 6 month timeframe which
might align with 2.3?

Hope this helps,
Tim











On Mon, Nov 13, 2017 at 10:07 AM, Neville Dipale <nevilled...@gmail.com>
wrote:

> Hi JB,
>
>
>   [X ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only
>   [ ] 0 (I don't care ;))
>   [ ] -1, I would like to still support Spark 1.x, and so having support
> of both Spark 1.x and 2.x (please provide specific comment)
>
> On 13 November 2017 at 10:32, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Beamers,
>>
>> I'm forwarding this discussion & vote from the dev mailing list to the
>> user mailing list.
>> The goal is to have your feedback as user.
>>
>> Basically, we have two options:
>> 1. Right now, in the PR, we support both Spark 1.x and 2.x using three
>> artifacts (common, spark1, spark2). You, as users, pick up spark1 or spark2
>> in your dependencies set depending the Spark target version you want.
>> 2. The other option is to upgrade and focus on Spark 2.x in Beam 2.3.0.
>> If you still want to use Spark 1.x, then, you will be stuck up to Beam
>> 2.2.0.
>>
>> Thoughts ?
>>
>> Thanks !
>> Regards
>> JB
>>
>>
>>  Forwarded Message 
>> Subject: [VOTE] Drop Spark 1.x support to focus on Spark 2.x
>> Date: Wed, 8 Nov 2017 08:27:58 +0100
>> From: Jean-Baptiste Onofré <j...@nanthrax.net>
>> Reply-To: d...@beam.apache.org
>> To: d...@beam.apache.org
>>
>> Hi all,
>>
>> as you might know, we are working on Spark 2.x support in the Spark
>> runner.
>>
>> I'm working on a PR about that:
>>
>> https://github.com/apache/beam/pull/3808
>>
>> Today, we have something working with both Spark 1.x and 2.x from a code
>> standpoint, but I have to deal with dependencies. It's the first step of
>> the update as I'm still using RDD, the second step would be to support
>> dataframe (but for that, I would need PCollection elements with schemas,
>> that's another topic on which Eugene, Reuven and I are discussing).
>>
>> However, as all major distributions now ship Spark 2.x, I don't think
>> it's required anymore to support Spark 1.x.
>>
>> If we agree, I will update and cleanup the PR to only support and focus
>> on Spark 2.x.
>>
>> So, that's why I'm calling for a vote:
>>
>>   [ ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only
>>   [ ] 0 (I don't care ;))
>>   [ ] -1, I would like to still support Spark 1.x, and so having support
>> of both Spark 1.x and 2.x (please provide specific comment)
>>
>> This vote is open for 48 hours (I have the commits ready, just waiting
>> the end of the vote to push on the PR).
>>
>> Thanks !
>> Regards
>> JB
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>


Re: ElasticSearch with RestHighLevelClient

2017-10-23 Thread Tim Robertson
Hi Ryan,

I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster.  I am told
2.2.0 is expected within a couple weeks.
My work is only a proof of concept for now, but I put in 300M fairly small
docs at around 100,000/sec on a 3 node cluster without any issue [1].

Hope this helps,
Tim


[1]
https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java


On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Ryan,
>
> the last version of ElasticsearchIO (that will be included in Beam 2.2.0)
> supports Elasticsearch 5.x.
>
> The client should be created in the @Setup (or @StartBundle) and release
> cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement
> to actually store the elements in the PCollection.
>
> Regards
> JB
>
>
> On 10/23/2017 08:53 PM, Ryan Bobko wrote:
>
>> Hi JB,
>> Thanks for your input. I'm trying to update ElasticsearchIO, and
>> hopefully learn a bit about Beam in the process. The documentation
>> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
>> prefer not to have two ES libs in my classpath if I can avoid it. I'm
>> just getting started, so my pipeline is quite simple:
>>
>> pipeline.apply( "Raw Reader", reader ) // read raw files
>>  .apply( "Document Generator", ParDo.of( extractor ) ) //
>> create my document objects for ES insertion
>>  .apply( "Elastic Writer", new ElasticWriter( ... ); //
>> upload to ES
>>
>>
>> public final class ElasticWriter extends
>> PTransform<PCollection, PDone> {
>>
>>private static final Logger log = LoggerFactory.getLogger(
>> ElasticWriter.class );
>>private final String elasticurl;
>>
>>public ElasticWriter( String url ) {
>>  elasticurl = url;
>>}
>>
>>@Override
>>public PDone expand( PCollection input ) {
>>  input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
>>  return PDone.in( input.getPipeline() );
>>}
>>
>>public static class WriteFn extends DoFn<Document, Void> implements
>> Serializable {
>>
>>  private transient RestHighLevelClient client;
>>  private final String elasticurl;
>>
>>  public WriteFn( String elasticurl ) {
>>this.elasticurl = elasticurl;
>>  }
>>
>>  @Setup
>>  public void setup() {
>>log.debug( " into WriteFn::setup" );
>>HttpHost elastic = HttpHost.create( elasticurl );
>>RestClientBuilder bldr = RestClient.builder( elastic );
>>
>>// if this is uncommented, the program never exits
>>//client = new RestHighLevelClient( bldr.build() );
>>  }
>>
>>  @Teardown
>>  public void teardown() {
>>log.debug( " into WriteFn::teardown" );
>>// there's nothing to tear down
>>  }
>>
>>  @ProcessElement
>>  public void pe( ProcessContext c ) {
>>Document doc = DocumentImpl.from( c.element() );
>>log.debug( "writing {} to elastic", doc.getMetadata().first(
>> Metadata.NAME ) );
>>
>>// this is where I want to write to ES, but for now, just write
>> a text file
>>
>>ObjectMapper mpr = new ObjectMapper();
>>
>>try ( Writer fos = new BufferedWriter( new FileWriter( new File(
>> "/tmp/writers",
>>doc.getMetadata().first( Metadata.NAME ).asString() ) ) )
>> ) {
>>  mpr.writeValue( fos, doc );
>>}
>>catch ( IOException ioe ) {
>>  log.error( ioe.getLocalizedMessage(), ioe );
>>}
>>  }
>>}
>> }
>>
>>
>> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> Why don't you use the ElasticsearchIO for that ?
>>>
>>> Anyway, can you share your pipeline where you have the ParDo calling your
>>> DoFn ?
>>>
>>> Thanks,
>>> Regards
>>> JB
>>>
>>>
>>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote:
>>>
>>>>
>>>> Hi List,
>>>> I'm trying to write an updated ElasticSearch client using the
>>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only
>>>> interested in writes at this time, so I'm u

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Thanks Eugene

On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <rang...@google.com> wrote:

> Ah, nice. It works.
>
> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> The following compiles fine:
>>
>>
>> p.apply(KafkaIO.<String, Envelope>read()
>> .withBootstrapServers("kafka:9092")
>> .withTopic("dbserver1.inventory.customers")
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializerAndCoder
>> ((Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class))
>>
>>
>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> wrote:
>>
>>> Same for me. It does not look like there is an annotation to suppress
>>> the error.
>>>
>>>
>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
>>>> Hi Eugene,
>>>>
>>>> I understood that was where Andrew started and reported this.  I tried
>>>> and saw the same as him.
>>>>
>>>> incompatible types: java.lang.Class>>> afka.serializers.KafkaAvroDeserializer> cannot be converted to
>>>> org.apache.kafka.common.serialization.Deserializer>>> pipelines.io.avro.Envelope>
>>>>
>>>> similarly with
>>>> (Class>) KafkaAvroDeserializer.class
>>>>
>>>>
>>>>
>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com
>>>> > wrote:
>>>>
>>>>> I don't think extending the class is necessary. Not sure I understand
>>>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have
>>>>> you tried this?
>>>>>
>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>>   .withValueDeserializerAndCoder((Deserializer)Kafka
>>>>> AvroDeserializer.class,
>>>>>   AvroCoder.of(Envelope.class))
>>>>>
>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <
>>>>> timrobertson...@gmail.com> wrote:
>>>>>
>>>>>> Hi Raghu
>>>>>>
>>>>>> I tried that but with KafkaAvroDeserializer already implementing
>>>>>> Deserializer I couldn't get it to work... I didn't spend too
>>>>>> much time though and agree something like that would be cleaner.
>>>>>>
>>>>>> Cheers,
>>>>>> Tim
>>>>>>
>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Tim.
>>>>>>>
>>>>>>> How about extending KafkaAvroDeserializer rather
>>>>>>> than AbstractKafkaAvroDeserializer?
>>>>>>>
>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>>>>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>>>>>> config to retrieve at run time.
>>>>>>> Even without storing the class, it is still useful. It simplifies
>>>>>>> user code:
>>>>>>>
>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>> TypedKafkaAvroDeserializer {}
>>>>>>>
>>>>>>> This should be part of same package as KafkaAvroDeserializer
>>>>>>> (surprised it is not there yet).
>>>>>>>
>>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>>>>>> timrobertson...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Happy to hear
>>>>>>>>
>>>>>>>> I wonder if we could do something like this (totally untested):
>>>>>>>>
>>>>>>>> public class TypedKafkaAvroDeserializer extends
>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer {
>>>>>>>>@Override
>>>>>>>> public T deserialize(String s, byte[] bytes) {
>>>>>>>> return (T) this.deserialize(bytes);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Thu, Oct 19, 2017

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Hi Eugene,

I understood that was where Andrew started and reported this.  I tried and
saw the same as him.

incompatible types:
java.lang.Class
cannot be converted to
org.apache.kafka.common.serialization.Deserializer

similarly with
(Class>) KafkaAvroDeserializer.class



On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> I don't think extending the class is necessary. Not sure I understand why
> a simple type casting for withDeserializerAndCoder doesn't work? Have you
> tried this?
>
> p.apply(KafkaIO.<String, Envelope>read()
>   .withValueDeserializerAndCoder((Deserializer)
> KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
>> Hi Raghu
>>
>> I tried that but with KafkaAvroDeserializer already implementing
>> Deserializer I couldn't get it to work... I didn't spend too
>> much time though and agree something like that would be cleaner.
>>
>> Cheers,
>> Tim
>>
>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote:
>>
>>> Thanks Tim.
>>>
>>> How about extending KafkaAvroDeserializer rather than
>>> AbstractKafkaAvroDeserializer?
>>>
>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>> config to retrieve at run time.
>>> Even without storing the class, it is still useful. It simplifies user
>>> code:
>>>
>>> public class EnvelopeKafkaAvroDeserializer extends
>>> TypedKafkaAvroDeserializer {}
>>>
>>> This should be part of same package as KafkaAvroDeserializer (surprised
>>> it is not there yet).
>>>
>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
>>>> Happy to hear
>>>>
>>>> I wonder if we could do something like this (totally untested):
>>>>
>>>> public class TypedKafkaAvroDeserializer extends
>>>> AbstractKafkaAvroDeserializer implements Deserializer {
>>>>@Override
>>>> public T deserialize(String s, byte[] bytes) {
>>>> return (T) this.deserialize(bytes);
>>>> }
>>>> }
>>>>
>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>>> andrew+b...@andrew-jones.com> wrote:
>>>>
>>>>> Thanks Tim, that works!
>>>>>
>>>>> Full code is:
>>>>>
>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>> AbstractKafkaAvroDeserializer implements Deserializer {
>>>>> @Override
>>>>> public void configure(Map<String, ?> configs, boolean isKey) {
>>>>> configure(new KafkaAvroDeserializerConfig(configs));
>>>>> }
>>>>>
>>>>> @Override
>>>>> public Envelope deserialize(String s, byte[] bytes) {
>>>>> return (Envelope) this.deserialize(bytes);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void close() {}
>>>>> }
>>>>>
>>>>> Nicer than my solution so think that is the one I'm going to go with
>>>>> for now.
>>>>>
>>>>> Thanks,
>>>>> Andrew
>>>>>
>>>>>
>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> I also saw the same behaviour.
>>>>>
>>>>> It's not pretty but perhaps try this? It was my last idea I ran out of
>>>>> time to try...
>>>>>
>>>>>
>>>>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>>>>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>>>>>
>>>>>   ...
>>>>>
>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>>
>>>>> *return *(Envelope) *this*.deserialize(bytes);
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>>>>> readerS

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Happy to hear

I wonder if we could do something like this (totally untested):

public class TypedKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer {
   @Override
public T deserialize(String s, byte[] bytes) {
return (T) this.deserialize(bytes);
}
}

On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+b...@andrew-jones.com
> wrote:

> Thanks Tim, that works!
>
> Full code is:
>
> public class EnvelopeKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
> @Override
> public void configure(Map<String, ?> configs, boolean isKey) {
> configure(new KafkaAvroDeserializerConfig(configs));
> }
>
> @Override
> public Envelope deserialize(String s, byte[] bytes) {
> return (Envelope) this.deserialize(bytes);
> }
>
> @Override
> public void close() {}
> }
>
> Nicer than my solution so think that is the one I'm going to go with for
> now.
>
> Thanks,
> Andrew
>
>
> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...
>
>
> *// Basically a copy KafkaAvroDeserializer with the casts in 
> deserialize**public class *EnvelopeAvroDeserializer *extends 
> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>
>   ...
>
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>
> *return *(Envelope) *this*.deserialize(bytes);
>
>   }
>
>
>
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
> readerSchema) {
>
> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>
>   }
>
>
>
>   ...
>
> }
>
>  Tim
>
>
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>
> Using Object doesn't work unfortunately. I get an 'Unable to automatically
> infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.<String, Object>read()
> .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class 
> io.confluent.kafka.serializers.KafkaAvroDeserializer:
> no coder registered for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
> private static KafkaAvroDecoder avroDecoder;
> static {
> final Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://registry:8081;);
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
> VerifiableProperties vProps = new VerifiableProperties(props);
> avroDecoder = new KafkaAvroDecoder(vProps);
> }
>
> public static void main(String[] args) {
>
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
>
> p.apply(KafkaIO.<byte[], byte[]>read()
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
> .withKeyDeserializer(ByteArrayDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withoutMetadata(
> )
> .apply(Values.<byte[]>create())
> .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> Envelope data = (Envelope)
> avroDecoder.fromBytes(c.element());
> c.output(data);
> }
> }))
>
> Thanks,
> Andrew
>
> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>
> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
> It seems that KafkaAvroDeserializer implements Deserializer,
> though I suppose with proper configuration that Object will at run-time be
> your desired type. Have you tried adding some Java type casts to make it
> compile?
>
>
> +1, cast might be the simplest fix. Alternately you can wrap or
> extend KafkaAvroDeserializer as

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Hi Andrew,

I also saw the same behaviour.

It's not pretty but perhaps try this? It was my last idea I ran out of time
to try...

// Basically a copy KafkaAvroDeserializer with the casts in deserialize
public class EnvelopeAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer {
  ...
  public Envelope deserialize(String s, byte[] bytes) {
return (Envelope) this.deserialize(bytes);
  }

  public Envelope deserialize(String s, byte[] bytes, Schema readerSchema) {
return (Envelope) this.deserialize(bytes, readerSchema);
  }

  ...
}

Tim


On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+b...@andrew-jones.com
> wrote:

> Using Object doesn't work unfortunately. I get an 'Unable to automatically
> infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.<String, Object>read()
> .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class 
> io.confluent.kafka.serializers.KafkaAvroDeserializer:
> no coder registered for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
> private static KafkaAvroDecoder avroDecoder;
> static {
> final Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://registry:8081;);
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
> VerifiableProperties vProps = new VerifiableProperties(props);
> avroDecoder = new KafkaAvroDecoder(vProps);
> }
>
> public static void main(String[] args) {
>
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
>
> p.apply(KafkaIO.<byte[], byte[]>read()
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
> .withKeyDeserializer(ByteArrayDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withoutMetadata(
> )
> .apply(Values.<byte[]>create())
> .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> Envelope data = (Envelope) avroDecoder.fromBytes(c.
> element());
> c.output(data);
> }
> }))
>
> Thanks,
> Andrew
>
> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>
> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
> It seems that KafkaAvroDeserializer implements Deserializer,
> though I suppose with proper configuration that Object will at run-time be
> your desired type. Have you tried adding some Java type casts to make it
> compile?
>
>
> +1, cast might be the simplest fix. Alternately you can wrap or
> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>
>
> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
> Hi,
>
> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
> it should be as simple as:
>
> p.apply(KafkaIO.<String, Envelope>*read*()
>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> Where Envelope is the name of the Avro class. However, that does not
> compile and I get the following error:
>
> incompatible types:
> java.lang.Class
> cannot be converted to java.lang.Class org.apache.kafka.common.serialization.Deserializer .inventory.customers.Envelope>>
>
> I've tried a number of variations on this theme but haven't yet worked
> it out and am starting to run out of ideas...
>
> Has anyone successfully read Avro data from Kafka?
>
> The code I'm using can be found at
> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
> environment can be created with Docker.
>
> Thanks,
> Andrew
>
>
>


Run a ParDo after a Partition

2017-10-03 Thread Tim Robertson
Hi folks,

I feel a little daft asking this, and suspect I am missing the obvious...

Can someone please tell me how I can do a ParDo following a Partition?

In spark I'd just repartition(...) and then a map() but I don't spot in the
Beam API how to run a ParDo on each partition in parallel.  Do I need to
multithread manually?

I tried this:

PCollectionList partitioned =
verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));

// does not run in parallel on spark...

for (PCollection untyped : partitioned.getAll()) {
PCollection inputDocs = partitioned.get(untyped).apply(
"convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));

inputDocs.apply(SolrIO.write().to("solr-load"
).withConnectionConfiguration(conn));
}


[For background: I'm using a non splittable custom Hadoop InputFormat which
means I end up with an RDD as a single partition, and need to split it to
run expensive op in parallel]

Thanks,
Tim