Re: Large public Beam projects?
My apologies, I missed the link: [1] https://github.com/gbif/pipelines On Tue, Apr 21, 2020 at 5:58 PM Tim Robertson wrote: > Hi Jordan > > I don't know if we qualify as a large Beam project but at GBIF.org we > bring together datasets from 1600+ institutions documenting 1,4B > observations of species (museum data, citizen science, environmental > reports etc). > As far as Beam goes though, we aren't using the most advanced > features. It's batch processing of data into Avro files stored on HDFS then > into HBase / Elasticsearch. > > All our data and code [1] are open and I'm happy to discuss any aspect of > it if it is helpful to you. > > Best wishes, > Tim > > On Tue, Apr 21, 2020 at 3:48 PM Jeff Klukas wrote: > >> Mozilla hosts the code for our data ingestion system publicly on GitHub. >> A good chunk of that architecture consists of Beam pipelines running on >> Dataflow. >> >> See: >> >> https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam >> >> and rendered usage documentation at: >> >> https://mozilla.github.io/gcp-ingestion/ingestion-beam/ >> >> On Mon, Apr 20, 2020 at 7:11 PM Jordan Thomas-Green >> wrote: >> >>> Does anyone have any public repos/examples of larger Beam >>> projects/implementations that they've seen? >>> >>
Re: Large public Beam projects?
Hi Jordan I don't know if we qualify as a large Beam project but at GBIF.org we bring together datasets from 1600+ institutions documenting 1,4B observations of species (museum data, citizen science, environmental reports etc). As far as Beam goes though, we aren't using the most advanced features. It's batch processing of data into Avro files stored on HDFS then into HBase / Elasticsearch. All our data and code [1] are open and I'm happy to discuss any aspect of it if it is helpful to you. Best wishes, Tim On Tue, Apr 21, 2020 at 3:48 PM Jeff Klukas wrote: > Mozilla hosts the code for our data ingestion system publicly on GitHub. A > good chunk of that architecture consists of Beam pipelines running on > Dataflow. > > See: > > https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam > > and rendered usage documentation at: > > https://mozilla.github.io/gcp-ingestion/ingestion-beam/ > > On Mon, Apr 20, 2020 at 7:11 PM Jordan Thomas-Green > wrote: > >> Does anyone have any public repos/examples of larger Beam >> projects/implementations that they've seen? >> >
Re: Beam discarding massive amount of events due to Window object or inner processing
I think everyone who followed this thread learned something! I know I did. Thanks for asking these questions. The summary and code snippets were just the right length to be accessible and focussed. On Wed, 16 Oct 2019, 06:04 Eddy G, wrote: > Thank you so so much guys for the amazing feedback you're giving me! > > I'm applying all of it and deep diving into more detail and see where I > could also go from there so I can still get the pipeline performance way > better. > > Again, really appreciated guys, you are amazing. >
Re: Beam discarding massive amount of events due to Window object or inner processing
You're getting 1 shard per pane, and you get a pane every time it's triggered on an early firing. And then another one in the final on-time pane. To have 1 file with 1 shard for every 15 minute window you need to only fire on window close. Ie AfterWatermark.pastendofwindow, without early firing. On Mon, 14 Oct 2019, 14:35 Eddy G, wrote: > Thanks a lot everyone for your so valuable feedback! > > Just updated my code, made some minor refactoring and seems to be working > like a charm. Still some data being dropped due to lateness (but I'm > talking about 100 elements per 2 million, so no "big deal" there, I will > take a look into extending lateness and overall performance bits that I'm > missing out). > > A thing that worries me a lot is that the wall time has been exponentially > increasing up to 1 day and 3 hours in the stage that is in charge of > writing all that captured data into parquet files, supposedly due to > .parquet file writing code. > > I suppose that this is also the reason why I still get tons of small > parquet files within a same bucket, as I should only have, in a perfect > scenario, 4 files (1 each 15 minutes due to the Window object length), when > I'm currently having +60! > > .apply("Write .parquet File(s)", > FileIO > .writeDynamic() > .by((SerializableFunction) > event -> { > // specify partitioning here > }) > .via(ParquetIO.sink(AVRO_SCHEMA)) > .to(options.getOutputDirectory()) > .withNaming(type -> ParquetFileNaming.getNaming(...)) > .withDestinationCoder(StringUtf8Coder.of()) > .withNumShards(1) // should this be 0? Could this > imply increasing of costs if set to 0? >
Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST
+1, I'd love to see this as a recording. Will you stick it up on youtube afterwards? On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog wrote: > Thanks, Pablo! Looking forward to it! Hopefully, it will also be recorded > as well. > > On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada wrote: > >> Yes! So I will be working on a small feature request for Java's >> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607 >> >> Maybe I'll do something for Python next month. : ) >> Best >> -P. >> >> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar >> wrote: >> >>> +1, I really appreciate this initiative. It would be really helpful >>> newbies like me. >>> >>> Is it possible to list out what are the things that you are planning to >>> cover? >>> >>> >>> >>> >>> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang wrote: >>> Thanks for organizing this Pablo, it'll be very helpful! On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada wrote: > Hello all, > I'll be having a session where I live-fix a Beam bug for 1 hour next > week. Everyone is invited. > > It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I will > finish a full change in that time frame, but we'll see. > > I have not yet decided if I will do this via hangouts, or via a > youtube livestream. In any case, I will share the link here in the next > few > days. > > I will most likely work on the Java SDK (I have a little feature > request in mind). > > Thanks! > -P. >
Re: gRPC method to get a pipeline definition?
Another +1 to support your research into this Chad. Thank you. Trying to understand where a beam process is in the Spark DAG is... not easy. A UI that helped would be a great addition. On Wed, Jun 26, 2019 at 3:30 PM Ismaël Mejía wrote: > +1 don't hesitate to create a JIRA + PR. You may be interested in [1]. > This is a simple util class that takes a proto pipeline object and > converts it into its graph representation in .dot format. You can > easily reuse the code or the idea as a first approach to show what the > pipeline is about. > > [1] > https://github.com/apache/beam/blob/2df702a1448fa6cbd22cd225bf16e9ffc4c82595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PortablePipelineDotRenderer.java#L29 > > On Wed, Jun 26, 2019 at 10:27 AM Robert Bradshaw > wrote: > > > > Yes, offering a way to get a pipeline from the job service directly > > would be a completely reasonable thing to do (and likely not hard at > > all). We welcome pull requests. > > > > Alternative UIs built on top of this abstraction would be an > > interesting project to explore. > > > > On Wed, Jun 26, 2019 at 8:44 AM Chad Dombrova wrote: > > > > > > Hi all, > > > I've been poking around the beam source code trying to determine > whether it's possible to get the definition of a pipeline via beam's > gPRC-based services. It looks like the message types are there for > describing a Pipeline but as far as I can tell, they're only used by > JobService.Prepare() for submitting a new job. > > > > > > If I were to create a PR to add support for a JobService.GetPipeline() > method, would that be interesting to others? Is it technically feasible? > i.e. is the pipeline definition readily available to the job service after > the job has been prepared and sent to the runner? > > > > > > Bigger picture, what I'm thinking about is writing a UI that's > designed to view and monitor Beam pipelines via the portability > abstraction, rather than using the (rather clunky) UIs that come with > runners like Flink and Dataflow. My thinking is that using beam's > abstractions would future proof the UI by allowing it to work with any > portable runner. Right now it's just an idea, so I'd love to know what > others think of this. > > > > > > thanks! > > > -chad > > > >
Re: [ANNOUNCEMENT] Common Pipeline Patterns - new section in the documentation + contributions welcome
This is great. Thanks Pablo and all I've seen several folk struggle with writing avro to dynamic locations which I think might be a good addition. If you agree I'll offer a PR unless someone gets there first - I have an example here: https://github.com/gbif/pipelines/blob/master/pipelines/export-gbif-hbase/src/main/java/org/gbif/pipelines/hbase/beam/ExportHBase.java#L81 On Fri, Jun 7, 2019 at 10:52 PM Pablo Estrada wrote: > Hello everyone, > A group of community members has been working on gathering and providing > common pipeline patterns for pipelines in Beam. These are examples on how > to perform certain operations, and useful ways of using Beam in your > pipelines. Some of them relate to processing of files, use of side inputs, > sate/timers, etc. Check them out[1]. > > These initial patterns have been chosen based on evidence gathered from > StackOverflow, and from talking to users of Beam. > > It would be great if this section could grow, and be useful to many Beam > users. For that reason, we invite anyone to share patterns, and pipeline > examples that they have used in the past. If you are interested in > contributing, please submit a pull request, or get in touch with Cyrus > Maden, Reza Rokni, Melissa Pashniak or myself. > > Thanks! > Best > -P. > > [1] https://beam.apache.org/documentation/patterns/overview/ >
Re: PubSubIO watermark not advancing for low volumes
Thanks! I made a jira https://issues.apache.org/jira/browse/BEAM-7322 And dumped my sample code here: https://github.com/tims/beam/tree/master/pubsub-watermark *From: *Alexey Romanenko *Date: *Wed, May 15, 2019 at 12:18 AM *To: * Not sure that this can be very helpful but I recall a similar issue with > KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. > > [1] https://issues.apache.org/jira/browse/BEAM-5063 > [2] https://github.com/apache/beam/pull/6178 > > On 13 May 2019, at 20:52, Kenneth Knowles wrote: > > You should definitely not feel foolish. That was a great report. I expect > many users face the same situation. If they are lurking on this list, then > you will have helped them already. > > Reza - I expect you should weigh in on the Jira, too, since the "one > message test" use case seems like it wouldn't work at all with those > MovingFunction params. But I may not understand all the subtleties of the > connector. > > Kenn > > *From: *Tim Sell > *Date: *Mon, May 13, 2019 at 8:06 AM > *To: * > > Thanks for the feedback, I did some more investigating after you said 1 >> second frequency should be enough to sample on.. And it is I feel foolish. >> I think I just wasn't waiting long enough as it takes minutes to close >> the windows. We waited much longer when we were just messages manually and >> never had a window close. >> >> I'm generating some stats of lag times to window closing for different >> frequencies, with code so people can reproduce it, then I'll add this to a >> jira ticket. >> >> *From: *Kenneth Knowles >> *Date: *Mon, May 13, 2019 at 10:48 AM >> *To: * , dev >> >> Nice analysis & details! >>> >>> Thanks to your info, I think it is the configuration of MovingFunction >>> [1] that is the likely culprit, but I don't totally understand why. It is >>> configured like so: >>> >>> - store 60 seconds of data >>> - update data every 5 seconds >>> - require at least 10 messages to be 'significant' >>> - require messages from at least 2 distinct 5 second update periods to >>> 'significant' >>> >>> I would expect a rate of 1 message per second to satisfy this. I may >>> have read something wrong. >>> >>> Have you filed an issue in Jira [2]? >>> >>> Kenn >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 >>> [2] https://issues.apache.org/jira/projects/BEAM/issues >>> >>> *From: *Tim Sell >>> *Date: *Fri, May 10, 2019 at 4:09 AM >>> *To: * >>> >>> Hello, >>>> >>>> I have identified an issue where the watermark does not advance when >>>> using the beam PubSubIO when volumes are very low. >>>> >>>> The behaviour is easily replicated if you apply a fixed window >>>> triggering after the watermark passes the end of the window. >>>> >>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) >>>> .apply(ParDo.of(new ParseScoreEventFn())) >>>> >>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) >>>> .triggering(AfterWatermark.pastEndOfWindow()) >>>> .withAllowedLateness(Duration.standardSeconds(60)) >>>> .discardingFiredPanes()) >>>> .apply(MapElements.into(kvs(strings(), integers())) >>>> .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), >>>> scoreEvent.getScore( >>>> .apply(Count.perKey()) >>>> .apply(ParDo.of(Log.of("counted per key"))); >>>> >>>> With this triggering, using both the flink local runner the direct >>>> runner, *no panes will ever be emitted* if the volume of messages in >>>> pubsub is very low. eg 1 per second. >>>> >>>> If I change the triggering to have early firings I get exactly the >>>> emitted panes that you would expect. >>>> >>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) >>>> .triggering(AfterWatermark.pastEndOfWindow() >>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >>>> .alignedTo(Duration.standardSeconds(60 >>>> .withAllowedLateness(Duration.standardSeconds(60)) >>>> .discardingFiredPanes()) >>>> >>>> I can use any var
Re: ElasticsearchIO Write Batching Problems
Great. Thanks for sharing Evan. Tim > On 7 Dec 2018, at 20:06, Evan Galpin wrote: > > I've actually found that this was just a matter of pipeline processing speed. > I removed many layers of transforms such that entities flowed through the > pipeline faster, and saw the batch sizes increase. I think I may make a > separate pipeline to take full advantage of batch indexing. > > Thanks! > >> On 2018/12/07 14:36:44, Evan Galpin wrote: >> I forgot to reiterate that the PCollection on which EsIO operates is of type >> String, where each element is a valid JSON document serialized _without_ >> pretty printing (i.e. without line breaks). If the PCollection should be of >> a different type, please let me know. From the EsIO source code, I believe >> it is correct to have a PCollection of String >> >>> On 2018/12/07 14:33:17, Evan Galpin wrote: >>> Thanks for confirming that this is unexpected behaviour Tim; certainly the >>> EsIO code looks to handle bundling. For the record, I've also confirmed via >>> debugger that `flushBatch()` is not being triggered by large document size. >>> >>> I'm sourcing records from Google's BigQuery. I have 2 queries which each >>> create a PCollection. I use a JacksonFactory to convert BigQuery results to >>> a valid JSON string (confirmed valid via debugger + linter). I have a few >>> Transforms to group the records from the 2 queries together, and then >>> convert again to JSON string via Jackson. I do know that the system creates >>> valid requests to the bulk API, it's just that it's only 1 document per >>> request. >>> >>> Thanks for starting the process with this. If there are other specific >>> details that I can provide to be helpful, please let me know. Here are the >>> versions of modules I'm using now: >>> >>> Beam SDK (beam-sdks-java-core): 2.8.0 >>> EsIO (beam-sdks-java-io-elasticsearch): 2.8.0 >>> BigQuery IO (beam-sdks-java-io-google-cloud-platform): 2.8.0 >>> DirectRunner (beam-runners-direct-java): 2.8.0 >>> DataflowRunner (beam-runners-google-cloud-dataflow-java): 2.8.0 >>> >>> >>>> On 2018/12/07 06:36:58, Tim wrote: >>>> Hi Evan >>>> >>>> That is definitely not the expected behaviour and I believe is covered in >>>> tests which use DirectRunner. Are you able to share your pipeline code, or >>>> describe how you source your records please? It could be that something >>>> else is causing EsIO to see bundles sized at only one record. >>>> >>>> I’ll verify ES IO behaviour when I get to a computer too. >>>> >>>> Tim (on phone) >>>> >>>>> On 6 Dec 2018, at 22:00, e...@calabs.ca wrote: >>>>> >>>>> Hi all, >>>>> >>>>> I’m having a bit of trouble with ElasticsearchIO Write transform. I’m >>>>> able to successfully index documents into my elasticsearch cluster, but >>>>> batching does not seem to work. There ends up being a 1:1 ratio between >>>>> HTTP requests sent to `/my-index/_doc/_bulk` and the number of documents >>>>> in my PCollection to which I apply the ElasticsearchIO PTransform. I’ve >>>>> noticed this specifically under the DirectRunner by utilizing a debugger. >>>>> >>>>> Am I missing something? Is this possibly a difference between execution >>>>> environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure >>>>> my program is taking advantage of batching/bulk indexing? >>>>> >>>>> Thanks, >>>>> Evan >>>> >>> >>
Re: Moving to spark 2.4
To clarify Ismaël's comment Cloudera repo indicates Cloudera 6.1 will have spark 2.4 but CDH is currently still on 6.0. https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/spark/spark-core_2.11/2.4.0-cdh6.1.0/ With the HWX / Cloudera merger the release cycle is not announced but 6.1 will likely not be there until early 2019. Also note that many folk will still be on CDH 5 as CDH 6 is relatively new. On Fri, Dec 7, 2018 at 5:06 PM Ismaël Mejía wrote: > It seems that Cloudera has it now, not sure if worth to wait for the > Hortonworks maybe worth waiting for EMR. > > https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/spark/spark-core_2.11/ > > A pro move to Spark 2.4.0 argument is for the future oriented (non > hadoop friends), because the support for kubernetes has improved a lot > in this release. > > On Fri, Dec 7, 2018 at 4:56 PM David Morávek > wrote: > > > > +1 for waiting for HDP and CDH adoption > > > > Sent from my iPhone > > > > On 7 Dec 2018, at 16:38, Alexey Romanenko > wrote: > > > > I agree with Ismael and I’d wait until the new Spark version will be > supported by major BigData distributors. > > > > On 7 Dec 2018, at 14:57, Vishwas Bm wrote: > > > > Hi Ismael, > > > > We have upgraded the spark to 2.4. > > In our setup we had run few basic tests and found it to be pretty stable. > > > > > > Thanks & Regards, > > Vishwas > > > > > > On Fri, Dec 7, 2018 at 2:53 PM Ismaël Mejía wrote: > >> > >> Hello Vishwas, > >> > >> The spark dependency in the spark runner is provided so you can > >> already pass the dependencies of spark 2.4 and it should work out of > >> the box. > >> > >> JB did a PR to upgrade the version of Spark in the runner, but maybe > >> it is worth to wait a bit before merging it, at least until some of > >> the Big Data distributions has spark 2.4.x support available, so far > >> nobody has upgraded it (well apart of databricks). > >> > >> What do others think, should we move ahead or are you aware of any > >> issue introduced by version 2.4.0? (Notice that the PR just updates > >> the version so code compatibility should be ok). > >> > >> Ismaël > >> > >> On Thu, Dec 6, 2018 at 12:14 PM Jean-Baptiste Onofré > wrote: > >> > > >> > Hi Vishwas > >> > > >> > Yes, I already started the update. > >> > > >> > Regards > >> > JB > >> > > >> > On 06/12/2018 07:39, Vishwas Bm wrote: > >> > > Hi, > >> > > > >> > > Currently I see that the spark version dependency used in Beam is > >> > > //"2.3.2". > >> > > As spark 2.4 is released now, is there a plan to upgrade Beam spark > >> > > dependency ? > >> > > > >> > > > >> > > *Thanks & Regards,* > >> > > *Vishwas > >> > > * > >> > > *Mob : 9164886653* > >> > > >> > -- > >> > Jean-Baptiste Onofré > >> > jbono...@apache.org > >> > http://blog.nanthrax.net > >> > Talend - http://www.talend.com > > > > >
Re: ElasticsearchIO Write Batching Problems
Hi Evan That is definitely not the expected behaviour and I believe is covered in tests which use DirectRunner. Are you able to share your pipeline code, or describe how you source your records please? It could be that something else is causing EsIO to see bundles sized at only one record. I’ll verify ES IO behaviour when I get to a computer too. Tim (on phone) > On 6 Dec 2018, at 22:00, e...@calabs.ca wrote: > > Hi all, > > I’m having a bit of trouble with ElasticsearchIO Write transform. I’m able to > successfully index documents into my elasticsearch cluster, but batching does > not seem to work. There ends up being a 1:1 ratio between HTTP requests sent > to `/my-index/_doc/_bulk` and the number of documents in my PCollection to > which I apply the ElasticsearchIO PTransform. I’ve noticed this specifically > under the DirectRunner by utilizing a debugger. > > Am I missing something? Is this possibly a difference between execution > environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure my > program is taking advantage of batching/bulk indexing? > > Thanks, > Evan
Re: bean elasticsearch connector for dataflow
Beam 2.8.0 brought in support for ES 6.3.x I’m not sure if that works against a 6.5.x server but I could imagine it does. Tim, Sent from my iPhone > On 4 Dec 2018, at 18:28, Adeel Ahmad wrote: > > Hello, > > I am trying to use gcp dataflow for indexing data from pubsub into > elasticsearch. > Does dataflow (which uses beam) now support elasticsearch 6.5.x or does it > still only support 5.6.x? > > -- > Thanks, > > Adeel > > >
Re: No filesystem found for scheme hdfs - with the FlinkRunner
Thanks for sharing that Tim, Sent from my iPhone > On 26 Oct 2018, at 17:50, Juan Carlos Garcia wrote: > > Just for everyone to know we figure it out, it was an environment problem. > > In our case we have our cluster in a network that is not accessible directly, > so to deploy we need to uses Jenkins with some slaves that have access to > that network. > > During deployment in the main method of the class we execute > FileSystems.setDefaultPipelineOptions(_options); which trigger the > HadoopFileSystemOptionsRegistrar via the ServiceLoader mechanism and this > access the environment variable HADOOP_CONF_DIR in order to correctly > register the Filesystem. > > SO, its very important that the machine you are using for deployment have > that Environment variable set as well (not only the worker where the pipeline > will run). > > In our case the variable was set on the .bashrc of the user used for > deployment, but here is the catch. > > We were using "sudo -u DEPLOYMENT_USER -s /var/lib/flink/bin/flink run -d > .", but the flag -s do not execute the user .bashrc (.bash_profile), > hence we have failures at runtime. The fix was just replacing -s flag with -i > to make sure the environment variable is present when the command to run > works. > > Thanks > > >> On Fri, Oct 26, 2018 at 1:52 PM Juan Carlos Garcia >> wrote: >> Hi Tim, >> >> I am using FileIO directly with the AvroIO.sink(...), however having >> experienced BEAM-2277 with the SparkRunner few months ago, i got the feeling >> this is something different (maybe some dependency mismatch/missing). >> >> Thanks >> >>> On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson >>> wrote: >>> Hi Juan >>> >>> This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277 >>> which we believed fixed in 2.7.0. >>> What IO are you using to write your files and can you paste a snippet of >>> your code please? >>> >>> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a >>> workaround too): >>> >>> transform.apply("Write", >>> AvroIO.writeGenericRecords(schema) >>> .to(FileSystems.matchNewResource(options.getTarget(),true)) >>> // BEAM-2277 workaround >>> >>> .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", >>> true))); >>> >>> Thanks >>> Tim >>> >>> >>>> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia >>>> wrote: >>>> Hi Folks, >>>> >>>> I have a strange situation while running beam 2.7.0 with the FlinkRunner, >>>> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its >>>> checkpoint. Flink is able to correctly writes its checkpoint / savepoint >>>> to HDFS without any problems. >>>> >>>> However, my pipeline has to write to HDFS as well, but fails with "Caused >>>> by: java.lang.IllegalArgumentException: No filesystem found for scheme >>>> hdfs" >>>> (stacktrace at the bottom) >>>> >>>> In the host where the pipeline is running: >>>> 1. The environment variable HADOOP_CONF_DIR is set. >>>> 2. During my pipeline construction i am explicitly calling >>>> FileSystems.setDefaultPipelineOptions(_options); to trigger the >>>> ServiceLoader to find all options registrar from the classpath >>>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems >>>> in my main method using reflection i am able to see that at launch time it >>>> contains: >>>>{file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff, >>>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8} >>>> >>>> Any idea what i am doing wrong with the HDFS integration? >>>> >>>> {snippet} >>>> >>>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions()); >>>> Field f = >>>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM"); >>>> f.setAccessible(true); >>>> AtomicReference> value >>>> = (AtomicReference>) f.get(null); >>>> >>>> System.out.println("
Re: No filesystem found for scheme hdfs - with the FlinkRunner
Hi Juan This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277 which we believed fixed in 2.7.0. What IO are you using to write your files and can you paste a snippet of your code please? On BEAM-2277 I posted a workaround for AvroIO (it might help you find a workaround too): transform.apply("Write", AvroIO.writeGenericRecords(schema) .to(FileSystems.matchNewResource(options.getTarget(),true)) // BEAM-2277 workaround .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", true))); Thanks Tim On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia wrote: > Hi Folks, > > I have a strange situation while running beam 2.7.0 with the FlinkRunner, > my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its > checkpoint. Flink is able to correctly writes its checkpoint / savepoint to > HDFS without any problems. > > However, my pipeline has to write to HDFS as well, but fails with "Caused > by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs" > (stacktrace at the bottom) > > In the host where the pipeline is running: > 1. The environment variable HADOOP_CONF_DIR is set. > 2. During my pipeline construction i am explicitly calling > FileSystems.setDefaultPipelineOptions(_options); to trigger the > ServiceLoader to find all options registrar from the classpath > 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems > in my main method using reflection i am able to see that at launch time it > contains: >{file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff, > hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8} > > Any idea what i am doing wrong with the HDFS integration? > > {snippet} > > FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions()); > Field f = > FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM"); > f.setAccessible(true); > AtomicReference> value > = (AtomicReference>) f.get(null); > > System.out.println("==="); > System.out.println(value); > {snippet} > > {stacktrace} > Caused by: java.lang.IllegalArgumentException: No filesystem found for > scheme hdfs > at > org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) > at > org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) > at > org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293) > at > org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) > at > org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) > at > org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) > at > org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920) > at > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715) > > {stacktrace} > > -- > > JC > > > > -- > > JC > >
Re: ElasticIO retry configuration exception
Great! Thank you. Feel free to add me as reviewer if you open a PR. Tim > On 12 Oct 2018, at 08:28, Wout Scheepers > wrote: > > Hey Tim, Romain, > > I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my > first PR. > First will focus on getting a reproducible in a unit test. > > Thanks! > Wout > > > > From: Tim Robertson > Reply-To: "user@beam.apache.org" > Date: Thursday, 11 October 2018 at 20:25 > To: "user@beam.apache.org" > Subject: Re: ElasticIO retry configuration exception > > I took a super quick look at the code and I think Romain is correct. > > 1. On a retry scenario it calls handleRetry() > 2. Within handleRetry() it gets the DefaultRetryPredicate and calls > test(response) - this reads the response stream to JSON > 3. When the retry is successful (no 429 code) the response is returned > 4. The response is then passed in to checkForErrors(...) > 5. This then tried to parse the response by reading the response stream. It > was already read in step 2 > > Can you please open a Jira for this Wout? > https://issues.apache.org/jira/projects/BEAM/issues > If you don't have an account I'll create it. > > This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so > before released in 2.9.0. > However as soon as it is fixed it is fairly easy to bring into your own > project, by copying in the single ElasticsearchIO.java declared in the same > package. > > Thank you for reporting the issue, > Tim > > > > > On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau > wrote: > It looks more like a client issue where the stream is already read, maybe > give a try to reproduce it in a unit test in beam ES module? This will enable > us to help you more accurately. > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn | Book > > > Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers > a écrit : > Hey Romain, > > I’ve check and am using the same http client as beam 2.7.0. > Just to be sure, I’ve created a minimal reproducible with a fresh project > with only the following dependencies in my build.gradle: > dependencies { > compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0') > compile ('org.apache.beam:beam-runners-direct-java:2.7.0') > compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0') > compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0') > compile > ('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0') > compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0') > compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0') > compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0') > compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0') > > > testCompile 'org.hamcrest:hamcrest-all:1.3' > testCompile 'org.assertj:assertj-core:3.4.1' > testCompile 'junit:junit:4.12' > } > > However, the problem still persists when writing a document to elastic with > the retryConfiguration set. > I guess the problem lies at my elastic version, as JB implies? > > Anyway, thanks for the suggestion. > > Wout > > From: Romain Manni-Bucau > Reply-To: "user@beam.apache.org" > Date: Wednesday, 10 October 2018 at 16:53 > To: "user@beam.apache.org" > Subject: Re: ElasticIO retry configuration exception > > Hi Wout, > > Maye check your classpath http client versions (against > https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle > for instance). > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn | Book > > > Le mer. 10 oct. 2018 à 15:37, Wout Scheepers > a écrit : > Hey JB, > > Thanks for your fast reply. > The elastic version we're using is 5.6.2. > > "version": { > "number": "5.6.2", > "build_hash": "57e20f3", > "build_date": "2017-09-23T13:16:45.703Z", > "build_snapshot": false, > "lucene_version": "6.6.1" > } > > > Wout > > > > On 10/10/2018, 15:34, "Jean-Baptiste Onofré" wrote: > > Hi Wout, > > what's the elasticsearch version ? (just to try to reproduce) > > Thanks, > Regards > JB > > On 10/10/2018 15:31, Wout Scheepers wrote: > > Hey all, > > > > > > > > When using .withRetryConfi
Re: ElasticIO retry configuration exception
I took a super quick look at the code and I think Romain is correct. 1. On a retry scenario it calls handleRetry() 2. Within handleRetry() it gets the DefaultRetryPredicate and calls test(response) - this reads the response stream to JSON 3. When the retry is successful (no 429 code) the response is returned 4. The response is then passed in to checkForErrors(...) 5. This then tried to parse the response by reading the response stream. It was already read in step 2 Can you please open a Jira for this Wout? https://issues.apache.org/jira/projects/BEAM/issues If you don't have an account I'll create it. This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so before released in 2.9.0. However as soon as it is fixed it is fairly easy to bring into your own project, by copying in the single ElasticsearchIO.java declared in the same package. Thank you for reporting the issue, Tim On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau wrote: > It looks more like a client issue where the stream is already read, maybe > give a try to reproduce it in a unit test in beam ES module? This will > enable us to help you more accurately. > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > > Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers < > wout.scheep...@vente-exclusive.com> a écrit : > >> Hey Romain, >> >> >> >> I’ve check and am using the same http client as beam 2.7.0. >> >> Just to be sure, I’ve created a minimal reproducible with a fresh project >> with only the following dependencies in my build.gradle: >> dependencies { >> compile (*'org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0'*) >> compile (*'org.apache.beam:beam-runners-direct-java:2.7.0'*) >> compile >> (*'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0'*) >> compile (*'org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0'*) >> compile >> (*'org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0'*) >> compile >> (*'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0'*) >> compile (*'org.apache.beam:beam-sdks-java-io-common:2.7.0'*) >> compile >> (*'org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0'*) >> compile (*'org.apache.beam:beam-sdks-java-io-jdbc:2.7.0'*) >> >> >> testCompile >> *'org.hamcrest:hamcrest-all:1.3'*testCompile >> *'org.assertj:assertj-core:3.4.1'*testCompile >> *'junit:junit:4.12'*} >> >> >> >> However, the problem still persists when writing a document to elastic >> with the retryConfiguration set. >> >> I guess the problem lies at my elastic version, as JB implies? >> >> >> >> Anyway, thanks for the suggestion. >> >> >> >> Wout >> >> >> >> *From: *Romain Manni-Bucau >> *Reply-To: *"user@beam.apache.org" >> *Date: *Wednesday, 10 October 2018 at 16:53 >> *To: *"user@beam.apache.org" >> *Subject: *Re: ElasticIO retry configuration exception >> >> >> >> Hi Wout, >> >> >> >> Maye check your classpath http client versions (against >> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle >> for instance). >> >> >> Romain Manni-Bucau >> @rmannibucau <https://twitter.com/rmannibucau> | Blog >> <https://rmannibucau.metawerx.net/> | Old Blog >> <http://rmannibucau.wordpress.com> | Github >> <https://github.com/rmannibucau> | LinkedIn >> <https://www.linkedin.com/in/rmannibucau> | Book >> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >> >> >> >> >> >> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers < >> wout.scheep...@vente-exclusive.com> a écrit : >> >> Hey JB, >> >> Thanks for your fast reply. >> The elastic version we're using is 5.6.2. >> >> "version": { >> "number": "5.6.2", >> "build_hash": "57e20f3", >> "build_date": "2017-09-23T13:16:45.703Z", >> "build_snapshot": false, >> "lucene_version": "6.6.1" >> }
Re: AvroIO - failure using direct runner with java.nio.file.FileAlreadyExistsException when moving from temp to destination
Hi Juan Well done for diagnosing your issue and thank you for taking the time to report it here. I'm not the author of this section but I've taken a quick look at the code and in line comments and have some observations which I think might help explain it. I notice it writes into temporary files and uses a HashMap for maintaining a pool of writers for each destination. I presume that you are receiving a new instance of the DestinationT object on each call and therefore the HashMap will be treating these as separate entries - a new writer is created for each entry in the hashMap.. The method responsible for providing the DestinationT is the following from the FileBasedSink which does document the expectation: /** * Returns an object that represents at a high level the destination being written to. May not * return null. A destination must have deterministic hash and equality methods defined. */ public abstract DestinationT getDestination(UserT element); Beyond that I notice that it also relies on using the a hashCode from the serialised object (i.e. after running through the coder) which you note too. The inline doc explains the reasoning for that which is because hashCode is not guaranteed to be stable across machines. When elements are processed on different machines we need deterministic behaviour to direct to the correct target shard. To do that the code opts to use a murmur3_32 algorithm which is safe across machines (Today I learnt!) and it operates on the encoded bytes for the object which are to be deterministic. I agree that we should improve the documentation and state that hashCode and equals needs to be implemented when user defined objects are used for the dynamic destination. Would you mind opening a Jira for that please? I hope this helps a little, and thanks again Tim On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia wrote: > Hi Guys, after days of bumping my head against the monitor i found why it > was not working. > > One key element when using *DynamicAvroDestinations *that is not > described in the documentation is that, if you are using a regular POJO as > *DestinationT* like i am (and not Long/String/Integer as the example) : > > {code} >DynamicAvroDestinations GenericRecord> > {code} > > Its very important to pay attention to equals / hashCode implementations, > which should aligned with your sharding/grouping/partition structure. Not > doing so will give you the result i described earlier (1 file (or shard) > with 1 record only, or sometime just an exception). > > While i still don't understand why it depends on equals / hashCode, as i > checked the class on: > *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688* > > The hashing depends on the Coder itself (method: int > hashDestination(DestinationT destination, Coder > destinationCoder)). > > Maybe a core member could explain the reason of it, or its an unexpected > behavior and there is a bug somewhere else. > > In my case below you can find my POJO Destination along with the > corresponding Codec implementation, which works correctly as long as the > equals / hashCode are implemented: > > {code} > static class GenericRecordDynamicDestination { > private String logicalType; > private final int year; > private final int month; > private final int day; > > public GenericRecordDynamicDestination(final String _logicalType, > final int _year, final int _month, final int _day) { > logicalType = _logicalType; > year = _year; > month = _month; > day = _day; > } > > public String getLogicalType() { > return logicalType; > } > > public void setLogicalType(final String _logicalType) { > logicalType = _logicalType; > } > > public int getYear() { > return year; > } > > public int getMonth() { > return month; > } > > public int getDay() { > return day; > } > > @Override > public boolean equals(final Object _o) { > if (this == _o) return true; > if (_o == null || getClass() != _o.getClass()) return false; > > final GenericRecordDynamicDestination that = > (GenericRecordDynamicDestination) _o; > > if (year != that.year) return false; > if (month != that.month) return false; > if (day != that.day) return false; > return logicalType.equals(that.logicalType); > } > > @Override > public int hashCode() { > int result = logicalType.hashCode(); > result = 31 * result + year; &
Re: Regression of (BEAM-2277) - IllegalArgumentException when using Hadoop file system for WordCount example.
Hi Juan, You are correct that BEAM-2277 seems to be recurring. I have today stumbled upon that myself in my own pipeline (not word count). I have just posted a workaround at the bottom of the issue, and will reopen the issue. Thank you for reminding us on this, Tim On Mon, Aug 20, 2018 at 4:44 PM Juan Carlos Garcia wrote: > BUMP!!! > > There are people reporting the problem for BEAM 2.6.0 > <https://issues.apache.org/jira/browse/BEAM-2277?focusedCommentId=16585979=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16585979>, > any CORE dev out there? > > :) > > > > > > On Mon, Jul 30, 2018 at 3:25 PM Juan Carlos Garcia > wrote: > >> Hi Folks, >> >> I experienced the issued described in (BEAM-2277 >> <https://issues.apache.org/jira/browse/BEAM-2277>), which shows it was >> fixed by v2.0.0 >> >> However using version 2.4.0 and 2.6.0 (another user reported it) shows >> the same error. >> >> So either it was not 100% fixed, or the bug appeared again. >> >> Thanks and Regards >> -- >> >> JC >> >> > > -- > > JC > >
Re: ElasticsearchIO bulk delete
> we decided to postpone the feature That makes sense. I believe the ES6 branch is in-part working (I've looked at the code but not used it) which you can see here [1] and the jira to watch or contribute is [2]. It would be a useful addition to test independently and report any observations or improvement requests on that jira. The offer to assist in your first PR remains open for the future - please don't hesitate to ask. Thanks, Tim [1] https://github.com/jsteggink/beam/tree/BEAM-3199/sdks/java/io/elasticsearch-6/src/main/java/org/apache/beam/sdk/io/elasticsearch [2] https://issues.apache.org/jira/browse/BEAM-3199 On Mon, Jul 30, 2018 at 10:55 AM, Wout Scheepers < wout.scheep...@vente-exclusive.com> wrote: > Hey Tim, > > > > Thanks for your proposal to mentor me through my first PR. > > As we’re definitely planning to upgrade to ES6 when Beam supports it, we > decided to postpone the feature (we have a fix that works for us, for now). > > When Beam supports ES6, I’ll be happy to make a contribution to get bulk > deletes working. > > > > For reference, I opened a ticket (https://issues.apache.org/ > jira/browse/BEAM-5042). > > > > Cheers, > > Wout > > > > > > *From: *Tim Robertson > *Reply-To: *"user@beam.apache.org" > *Date: *Friday, 27 July 2018 at 17:43 > *To: *"user@beam.apache.org" > *Subject: *Re: ElasticsearchIO bulk delete > > > > Hi Wout, > > > > This is great, thank you. I wrote the partial update support you reference > and I'll be happy to mentor you through your first PR - welcome aboard. Can > you please open a Jira to reference this work and we'll assign it to you? > > > > We discussed having the "_xxx" fields in the document and triggering > actions based on that in the partial update jira but opted to avoid > it. Based on that discussion the ActionFn would likely be the preferred > approach. Would that be possible? > > > > It will be important to provide unit and integration tests as well. > > > > Please be aware that there is a branch and work underway for ES6 already > which is rather different on the write() path so this may become redundant > rather quickly. > > > > Thanks, > > Tim > > > > @timrobertson100 on the Beam slack channel > > > > > > > > On Fri, Jul 27, 2018 at 2:53 PM, Wout Scheepers exclusive.com> wrote: > > Hey all, > > > > A while ago, I patched ElasticsearchIO to be able to do partial updates > and deletes. > > However, I did not consider my patch pull-request-worthy as the json > parsing was done inefficient (parsed it twice per document). > > > > Since Beam 2.5.0 partial updates are supported, so the only thing I’m > missing is the ability to send bulk *delete* requests. > > We’re using entity updates for event sourcing in our data lake and need to > persist deleted entities in elastic. > > We’ve been using my patch in production for the last year, but I would > like to contribute to get the functionality we need into one of the next > releases. > > > > I’ve created a gist that works for me, but is still inefficient (parsing > twice: once to check the ‘_action` field, once to get the metadata). > > Each document I want to delete needs an additional ‘_action’ field with > the value ‘delete’. It doesn’t matter the document still contains the > redundant field, as the delete action only requires the metadata. > > I’ve added the method isDelete() and made some changes to the > processElement() method. > > https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9 > > > > I would like to make my solution more generic to fit into the current > ElasticsearchIO and create a proper pull request. > > As this would be my first pull request for beam, can anyone point me in > the right direction before I spent too much time creating something that > will be rejected? > > > > Some questions on the top of my mind are: > >- Is it a good idea it to make the ‘action’ part for the bulk api >generic? >- Should it be even more generic? (e.g.: set an ‘ActionFn’ on the >ElasticsearchIO) >- If I want to avoid parsing twice, the parsing should be done outside >of the getDocumentMetaData() method. Would this be acceptable? >- Is it possible to avoid passing the action as a field in the >document? >- Is there another or better way to get the delete functionality in >general? > > > > All feedback is more than welcome. > > > Cheers, > Wout > > > > > > >
Re: ElasticsearchIO bulk delete
Hi Wout, This is great, thank you. I wrote the partial update support you reference and I'll be happy to mentor you through your first PR - welcome aboard. Can you please open a Jira to reference this work and we'll assign it to you? We discussed having the "_xxx" fields in the document and triggering actions based on that in the partial update jira but opted to avoid it. Based on that discussion the ActionFn would likely be the preferred approach. Would that be possible? It will be important to provide unit and integration tests as well. Please be aware that there is a branch and work underway for ES6 already which is rather different on the write() path so this may become redundant rather quickly. Thanks, Tim @timrobertson100 on the Beam slack channel On Fri, Jul 27, 2018 at 2:53 PM, Wout Scheepers < wout.scheep...@vente-exclusive.com> wrote: > Hey all, > > > > A while ago, I patched ElasticsearchIO to be able to do partial updates > and deletes. > > However, I did not consider my patch pull-request-worthy as the json > parsing was done inefficient (parsed it twice per document). > > > > Since Beam 2.5.0 partial updates are supported, so the only thing I’m > missing is the ability to send bulk *delete* requests. > > We’re using entity updates for event sourcing in our data lake and need to > persist deleted entities in elastic. > > We’ve been using my patch in production for the last year, but I would > like to contribute to get the functionality we need into one of the next > releases. > > > > I’ve created a gist that works for me, but is still inefficient (parsing > twice: once to check the ‘_action` field, once to get the metadata). > > Each document I want to delete needs an additional ‘_action’ field with > the value ‘delete’. It doesn’t matter the document still contains the > redundant field, as the delete action only requires the metadata. > > I’ve added the method isDelete() and made some changes to the > processElement() method. > > https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9 > > > > I would like to make my solution more generic to fit into the current > ElasticsearchIO and create a proper pull request. > > As this would be my first pull request for beam, can anyone point me in > the right direction before I spent too much time creating something that > will be rejected? > > > > Some questions on the top of my mind are: > >- Is it a good idea it to make the ‘action’ part for the bulk api >generic? >- Should it be even more generic? (e.g.: set an ‘ActionFn’ on the >ElasticsearchIO) >- If I want to avoid parsing twice, the parsing should be done outside >of the getDocumentMetaData() method. Would this be acceptable? >- Is it possible to avoid passing the action as a field in the >document? >- Is there another or better way to get the delete functionality in >general? > > > > All feedback is more than welcome. > > > Cheers, > Wout > > > > >
Re: Pipeline error handling
Hi Kelsey Does the example [1] in the docs demonstrate differing generic types when using withOutputTags()? Could something like the following work for you? final TupleTag type1Records = final TupleTag type2Records = final TupleTag invalidRecords = // CSVInvalidLine holds e.g. an ID and a cause HTH, Tim [1] https://beam.apache.org/documentation/programming-guide/#additional-outputs On Thu, Jul 26, 2018 at 9:44 AM, Kelsey RIDER wrote: > I’m trying to figure out how to handle errors in my Pipeline. > > > > Right now, my main transform is a DoFn. I > have a few different TupleTag that I use depending on the data > contained in the records. > > In the event there’s a problem with a line (due to one of several possible > causes), I created a TupleTag ERROR. However, just doing this > doesn’t carry with it any information about the error. > > I would like for the ERROR tag to have a type other than CSVRecord, e.g. > some sort of ErrorInfo class containing the row number, filename, message > about what went wrong, etc… > > > > I can’t use multiple TupleTag types with ParDo, because the > withOutputTags() method forces them to all have the same generic parameter. > > > > I saw the example here: https://medium.com/@vallerylancey/error-handling- > elements-in-apache-beam-pipelines-fffdea91af2a > > But I don’t see how this can work, since they use multiple generic types > in withOutputTags(). (And is this good practice? Seems like they “cheat” by > not calling apply(), instead directly transforming the PCollection (and why > even bother extending DoFn in this case?).) > > > > Finally, if I write my own PTransform PCollectionTuple> class, and start manually creating PCollections and > whatnot…then this would effectively become a bottleneck where everything > has to be read at once, and there’s no longer any sequential handling of > the records as they’re read, right? > Suite à l’évolution des dispositifs de réglementation du travail, si vous > recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés > merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y > répondre immédiatement. >
Re: Beam and Hive
I answered on SO Kelsey, You should be able to add this I believe to explicitly declare the coder to use: p.getCoderRegistry() .registerCoderForClass(HCatRecord.class, WritableCoder.of(DefaultHCatRecord.class)); On Fri, Jul 20, 2018 at 5:05 PM, Kelsey RIDER wrote: > Hello, > > > > I wrote it all in this SO post: https://stackoverflow.com/ > questions/51443966/simple-hive-write-not-working > > > > Since then, I’ve started looking at writing my own Coder to handle the > serialization of HCatRecords. But shouldn’t this be included? > Suite à l’évolution des dispositifs de réglementation du travail, si vous > recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés > merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y > répondre immédiatement. >
withHotKeyFanout Question
My pipeline utilizes the Combine.perKey transform and I would like to add withHotKeyFanout to prevent the combine from being a bottleneck. To test I made sure that my mergeAccumulators function was correct and added withHotKeyFanout(2) to my Combine transform. When I launch the pipeline with flink(v1.3.2) the pipeline only lasts for a minute or so until I am greeted with this stacktrace: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) ...7 more Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) ... 15 more Caused by: org.apache.avro.AvroRuntimeException: Array data must be a Collection or Array at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:70) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:156) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:64) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:879) at
Re: Windowing question
Yes that is what the pipeline I came up with looks like. However the next step in the pipeline is new/expired logic. I have tried a variety of things but none have gotten me close to what I want. Hence my questioning to this mailing list. Thanks, Tim From: Kenneth Knowles <k...@google.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Wednesday, January 24, 2018 at 2:21 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Windowing question A first approach would be to just not translate any of the new/expired logic. Beam does have the concept of expiring a window, though it is true that only particular transformations actually drop expired data. Have you tried something along the lines of this? pipeline.begin() .apply(FooIO.read(...)) .apply(Window.into(SlidingWindows.of(Duration.standardMinutes(5)).every(Duration.standardSeconds(30 .apply(ParDo.of(new DoFn<>() { ... get into proper format ... }) ... the rest of the logic ... Just a very vague sketch of what, actually, most pipelines look like. Kenn On Wed, Jan 24, 2018 at 11:07 AM, Tim Ross <tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote: I am trying to convert an existing Apache Storm Bolt into an Apache Beam pipeline. The storm bolt used sliding windows with a duration of 5 minute and a period of 30 seconds. After doing some initial transforms to get data in the proper format it would process all elements which were new or expired. Since Beam doesn’t have the concept of new and expired data in a window I’m trying to figure out how one would accomplish this. Thanks, Tim From: Kenneth Knowles <k...@google.com<mailto:k...@google.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Wednesday, January 24, 2018 at 1:45 PM To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Re: Windowing question Generally, Beam will discard expired data for you (including state). Can you describe more? What is your windowing strategy? What are the edge triggers? On Wed, Jan 24, 2018 at 10:37 AM, Tim Ross <tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote: I am just trying to do certain processing on edge triggers, i.e. new or expired data, to reduce the overall processing of a very large stream. How would I go about doing that with state? As I understand it, state is tied to key and window. Thanks, Tim From: Kenneth Knowles <k...@google.com<mailto:k...@google.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Wednesday, January 24, 2018 at 1:25 PM To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Re: Windowing question A little clarification: in Beam an element exists in a single window, mathematically speaking. So when you use SlidingWindows, for example, to assign multiple windows this "copies" the value for each window, and that is how you should think of it, from a calculation point of view. Under the hood, a compressed representation is often used, but not in all situations. Kenn On Wed, Jan 24, 2018 at 9:45 AM, Robert Bradshaw <rober...@google.com<mailto:rober...@google.com>> wrote: No, Apache Beam doesn't offer this explicitly. You could accomplish it using State, but perhaps if you clarified what you were trying to accomplish by using these mechanisms there'd be another way to do the same thing. On Wed, Jan 24, 2018 at 7:03 AM, Tim Ross <tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote: > Is there anything comparable to Apache Storm’s Window.getNew and > Window.getExpired in Apache Beam? How would I determine if an element is > new or expired in consecutive windows? > > > > Thanks, > > Tim > > This e-mail message and any attachments to it are intended only for the > named recipients and may contain legally privileged and/or confidential > information. If you are not one of the intended recipients, do not duplicate > or forward this e-mail message. This e-mail message and any attachments to it are intended only for the named recipients and may contain legally privileged and/or confidential information. If you are not one of the intended recipients, do not duplicate or forward this e-mail message. This e-mail message and any attachments to it are intended only for the named recipients and may contain legally privileged and/or confidential information. If you are not one of the intended recipients, do not duplicate o
Re: Windowing question
I am trying to convert an existing Apache Storm Bolt into an Apache Beam pipeline. The storm bolt used sliding windows with a duration of 5 minute and a period of 30 seconds. After doing some initial transforms to get data in the proper format it would process all elements which were new or expired. Since Beam doesn’t have the concept of new and expired data in a window I’m trying to figure out how one would accomplish this. Thanks, Tim From: Kenneth Knowles <k...@google.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Wednesday, January 24, 2018 at 1:45 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Windowing question Generally, Beam will discard expired data for you (including state). Can you describe more? What is your windowing strategy? What are the edge triggers? On Wed, Jan 24, 2018 at 10:37 AM, Tim Ross <tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote: I am just trying to do certain processing on edge triggers, i.e. new or expired data, to reduce the overall processing of a very large stream. How would I go about doing that with state? As I understand it, state is tied to key and window. Thanks, Tim From: Kenneth Knowles <k...@google.com<mailto:k...@google.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Wednesday, January 24, 2018 at 1:25 PM To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Re: Windowing question A little clarification: in Beam an element exists in a single window, mathematically speaking. So when you use SlidingWindows, for example, to assign multiple windows this "copies" the value for each window, and that is how you should think of it, from a calculation point of view. Under the hood, a compressed representation is often used, but not in all situations. Kenn On Wed, Jan 24, 2018 at 9:45 AM, Robert Bradshaw <rober...@google.com<mailto:rober...@google.com>> wrote: No, Apache Beam doesn't offer this explicitly. You could accomplish it using State, but perhaps if you clarified what you were trying to accomplish by using these mechanisms there'd be another way to do the same thing. On Wed, Jan 24, 2018 at 7:03 AM, Tim Ross <tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote: > Is there anything comparable to Apache Storm’s Window.getNew and > Window.getExpired in Apache Beam? How would I determine if an element is > new or expired in consecutive windows? > > > > Thanks, > > Tim > > This e-mail message and any attachments to it are intended only for the > named recipients and may contain legally privileged and/or confidential > information. If you are not one of the intended recipients, do not duplicate > or forward this e-mail message. This e-mail message and any attachments to it are intended only for the named recipients and may contain legally privileged and/or confidential information. If you are not one of the intended recipients, do not duplicate or forward this e-mail message. This e-mail message and any attachments to it are intended only for the named recipients and may contain legally privileged and/or confidential information. If you are not one of the intended recipients, do not duplicate or forward this e-mail message.
Re: Windowing question
I am just trying to do certain processing on edge triggers, i.e. new or expired data, to reduce the overall processing of a very large stream. How would I go about doing that with state? As I understand it, state is tied to key and window. Thanks, Tim From: Kenneth Knowles <k...@google.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Wednesday, January 24, 2018 at 1:25 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Windowing question A little clarification: in Beam an element exists in a single window, mathematically speaking. So when you use SlidingWindows, for example, to assign multiple windows this "copies" the value for each window, and that is how you should think of it, from a calculation point of view. Under the hood, a compressed representation is often used, but not in all situations. Kenn On Wed, Jan 24, 2018 at 9:45 AM, Robert Bradshaw <rober...@google.com<mailto:rober...@google.com>> wrote: No, Apache Beam doesn't offer this explicitly. You could accomplish it using State, but perhaps if you clarified what you were trying to accomplish by using these mechanisms there'd be another way to do the same thing. On Wed, Jan 24, 2018 at 7:03 AM, Tim Ross <tim_r...@ultimatesoftware.com<mailto:tim_r...@ultimatesoftware.com>> wrote: > Is there anything comparable to Apache Storm’s Window.getNew and > Window.getExpired in Apache Beam? How would I determine if an element is > new or expired in consecutive windows? > > > > Thanks, > > Tim > > This e-mail message and any attachments to it are intended only for the > named recipients and may contain legally privileged and/or confidential > information. If you are not one of the intended recipients, do not duplicate > or forward this e-mail message. This e-mail message and any attachments to it are intended only for the named recipients and may contain legally privileged and/or confidential information. If you are not one of the intended recipients, do not duplicate or forward this e-mail message.
Windowing question
Is there anything comparable to Apache Storm’s Window.getNew and Window.getExpired in Apache Beam? How would I determine if an element is new or expired in consecutive windows? Thanks, Tim This e-mail message and any attachments to it are intended only for the named recipients and may contain legally privileged and/or confidential information. If you are not one of the intended recipients, do not duplicate or forward this e-mail message.
Re: [DISCUSS] Drop Spark 1.x support to focus on Spark 2.x
Thanks JB From which release will Spark 1.x be dropped please? Is this slated for 2.3.0 at the end of the year? Thanks, Tim, Sent from my iPhone > On 20 Nov 2017, at 21:21, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > > Hi, > > it seems we have a consensus to upgrade to Spark 2.x, dropping Spark 1.x. I > will upgrade the PR accordingly. > > Thanks all for your input and feedback. > > Regards > JB > >> On 11/13/2017 09:32 AM, Jean-Baptiste Onofré wrote: >> Hi Beamers, >> I'm forwarding this discussion & vote from the dev mailing list to the user >> mailing list. >> The goal is to have your feedback as user. >> Basically, we have two options: >> 1. Right now, in the PR, we support both Spark 1.x and 2.x using three >> artifacts (common, spark1, spark2). You, as users, pick up spark1 or spark2 >> in your dependencies set depending the Spark target version you want. >> 2. The other option is to upgrade and focus on Spark 2.x in Beam 2.3.0. If >> you still want to use Spark 1.x, then, you will be stuck up to Beam 2.2.0. >> Thoughts ? >> Thanks ! >> Regards >> JB >> Forwarded Message >> Subject: [VOTE] Drop Spark 1.x support to focus on Spark 2.x >> Date: Wed, 8 Nov 2017 08:27:58 +0100 >> From: Jean-Baptiste Onofré <j...@nanthrax.net> >> Reply-To: d...@beam.apache.org >> To: d...@beam.apache.org >> Hi all, >> as you might know, we are working on Spark 2.x support in the Spark runner. >> I'm working on a PR about that: >> https://github.com/apache/beam/pull/3808 >> Today, we have something working with both Spark 1.x and 2.x from a code >> standpoint, but I have to deal with dependencies. It's the first step of the >> update as I'm still using RDD, the second step would be to support dataframe >> (but for that, I would need PCollection elements with schemas, that's >> another topic on which Eugene, Reuven and I are discussing). >> However, as all major distributions now ship Spark 2.x, I don't think it's >> required anymore to support Spark 1.x. >> If we agree, I will update and cleanup the PR to only support and focus on >> Spark 2.x. >> So, that's why I'm calling for a vote: >> [ ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only >> [ ] 0 (I don't care ;)) >> [ ] -1, I would like to still support Spark 1.x, and so having support of >> both Spark 1.x and 2.x (please provide specific comment) >> This vote is open for 48 hours (I have the commits ready, just waiting the >> end of the vote to push on the PR). >> Thanks ! >> Regards >> JB > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com
Re: Does ElasticsearchIO in the latest RC support adding document IDs?
Hi Chet, I'll be a user of this, so thank you. It seems reasonable although - did you consider letting folk name the document ID field explicitly? It would avoid an unnecessary transformation and might be simpler: // instruct the writer to use a provided document ID ElasticsearchIO.write().withConnectionConfiguration(conn).withMaxBatchSize(BATCH_SIZE).withDocumentIdField("myID"); On Wed, Nov 15, 2017 at 8:08 PM, Chet Aldrichwrote: > Given that this seems like a change that should probably happen, and I’d > like to help contribute if possible, a few questions and my current > opinion: > > So I’m leaning towards approach B here, which is: > > b. (a bit less user friendly) PCollection with K as an id. But forces > the user to do a Pardo before writing to ES to output KV pairs of > > I think that the reduction in user-friendliness may be outweighed by the > fact that this obviates some of the issues surrounding a failure when > finishing a bundle. Additionally, this *forces* the user to provide a > document id, which I think is probably better practice. This will also > probably lead to fewer frustrations around “magic” code that just pulls > something in if it happens to be there, and doesn’t if not. We’ll need to > rely on the user catching this functionality in the docs or the code itself > to take advantage of it. > IMHO it’d be generally better to enforce this at compile time because it > does have an effect on whether the pipeline produces duplicates on failure. > Additionally, we get the benefit of relatively intuitive behavior where if > the user passes in the same Key value, it’ll update a record in ES, and if > the key is different then it will create a new record. > > Curious to hear thoughts on this. If this seems reasonable I’ll go ahead > and create a JIRA for tracking and start working on a PR for this. Also, if > it’d be good to loop in the dev mailing list before starting let me know, > I’m pretty new to this. > > Chet > > On Nov 15, 2017, at 12:53 AM, Etienne Chauchot > wrote: > > Hi Chet, > > What you say is totally true, docs written using ElasticSearchIO will > always have an ES generated id. But it might change in the future, indeed > it might be a good thing to allow the user to pass an id. Just in 5 seconds > thinking, I see 3 possible designs for that. > > a.(simplest) use a json special field for the id, if it is provided by the > user in the input json then it is used, auto-generated id otherwise. > > b. (a bit less user friendly) PCollection with K as an id. But forces > the user to do a Pardo before writing to ES to output KV pairs of > > c. (a lot more complex) Allow the IO to serialize/deserialize java beans > and have an String id field. Matching java types to ES types is quite > tricky, so, for now we just relied on the user to serialize his beans into > json and let ES match the types automatically. > > Related to the problems you raise bellow: > > 1. Well, the bundle is the commit entity of beam. Consider the case of > ESIO.batchSize being < to bundle size. While processing records, when the > number of elements reaches batchSize, an ES bulk insert will be issued but > no finishBundle. If there is a problem later on in the bundle processing > before the finishBundle, the checkpoint will still be at the beginning of > the bundle, so all the bundle will be retried leading to duplicate > documents. Thanks for raising that! I'm CCing the dev list so that someone > could correct me on the checkpointing mecanism if I'm missing something. > Besides I'm thinking about forcing the user to provide an id in all cases > to workaround this issue. > 2. Correct. > > Best, > Etienne > > Le 15/11/2017 à 02:16, Chet Aldrich a écrit : > > Hello all! > > So I’ve been using the ElasticSearchIO sink for a project (unfortunately > it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC) > and I’m finding that it doesn’t allow for changing the document ID, but > only lets you pass in a record, which means that the document ID is > auto-generated. See this line for what specifically is happening: > > https://github.com/apache/beam/blob/master/sdks/java/io/ > elasticsearch/src/main/java/org/apache/beam/sdk/io/ > elasticsearch/ElasticsearchIO.java#L838 > > Essentially the data part of the document is being placed but it doesn’t > allow for other properties, such as the document ID, to be set. > > This leads to two problems: > > 1. Beam doesn’t necessarily guarantee exactly-once execution for a given > item in a PCollection, as I understand it. This means that you may get more > than one record in Elastic for a given item in a PCollection that you pass > in. > > 2. You can’t do partial updates to an index. If you run a batch job once, > and then run the batch job again on the same index without clearing it, you > just double everything in there. > > Is there any good way around this? > > I’d be
Re: Does ElasticsearchIO in the latest RC support adding document IDs?
Hi Chet, +1 for interest in this from me too. If it helps, I'd have expected a) to be the implementation (e.g. something like "_id" being used if present) and handing multiple delivery being a responsibility of the developer. Thanks, Tim On Wed, Nov 15, 2017 at 10:08 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > I think it's also related to the discussion Romain raised on the dev > mailing list (gap between batch size, checkpointing & bundles). > > Regards > JB > > On 11/15/2017 09:53 AM, Etienne Chauchot wrote: > >> Hi Chet, >> >> What you say is totally true, docs written using ElasticSearchIO will >> always have an ES generated id. But it might change in the future, indeed >> it might be a good thing to allow the user to pass an id. Just in 5 seconds >> thinking, I see 3 possible designs for that. >> >> a.(simplest) use a json special field for the id, if it is provided by >> the user in the input json then it is used, auto-generated id otherwise. >> >> b. (a bit less user friendly) PCollection with K as an id. But forces >> the user to do a Pardo before writing to ES to output KV pairs of <id, json> >> >> c. (a lot more complex) Allow the IO to serialize/deserialize java beans >> and have an String id field. Matching java types to ES types is quite >> tricky, so, for now we just relied on the user to serialize his beans into >> json and let ES match the types automatically. >> >> Related to the problems you raise bellow: >> >> 1. Well, the bundle is the commit entity of beam. Consider the case of >> ESIO.batchSize being < to bundle size. While processing records, when the >> number of elements reaches batchSize, an ES bulk insert will be issued but >> no finishBundle. If there is a problem later on in the bundle processing >> before the finishBundle, the checkpoint will still be at the beginning of >> the bundle, so all the bundle will be retried leading to duplicate >> documents. Thanks for raising that! I'm CCing the dev list so that someone >> could correct me on the checkpointing mecanism if I'm missing something. >> Besides I'm thinking about forcing the user to provide an id in all cases >> to workaround this issue. >> >> 2. Correct. >> >> Best, >> Etienne >> >> Le 15/11/2017 à 02:16, Chet Aldrich a écrit : >> >>> Hello all! >>> >>> So I’ve been using the ElasticSearchIO sink for a project (unfortunately >>> it’s Elasticsearch 5.x, and so I’ve been messing around with the latest RC) >>> and I’m finding that it doesn’t allow for changing the document ID, but >>> only lets you pass in a record, which means that the document ID is >>> auto-generated. See this line for what specifically is happening: >>> >>> https://github.com/apache/beam/blob/master/sdks/java/io/elas >>> ticsearch/src/main/java/org/apache/beam/sdk/io/elasticsear >>> ch/ElasticsearchIO.java#L838 >>> >>> Essentially the data part of the document is being placed but it doesn’t >>> allow for other properties, such as the document ID, to be set. >>> >>> This leads to two problems: >>> >>> 1. Beam doesn’t necessarily guarantee exactly-once execution for a given >>> item in a PCollection, as I understand it. This means that you may get more >>> than one record in Elastic for a given item in a PCollection that you pass >>> in. >>> >>> 2. You can’t do partial updates to an index. If you run a batch job >>> once, and then run the batch job again on the same index without clearing >>> it, you just double everything in there. >>> >>> Is there any good way around this? >>> >>> I’d be happy to try writing up a PR for this in theory, but not sure how >>> to best approach it. Also would like to figure out a way to get around this >>> in the meantime, if anyone has any ideas. >>> >>> Best, >>> >>> Chet >>> >>> P.S. CCed echauc...@gmail.com <mailto:echauc...@gmail.com> because it >>> seems like he’s been doing work related to the elastic sink. >>> >>> >>> >> > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >
Re: [DISCUSS] Drop Spark 1.x support to focus on Spark 2.x
Thanks JB On "thoughts": - Cloudera 5.13 will still default to 1.6 even though a 2.2 parcel is available (HWX provides both) - Cloudera support for spark 2 has a list of exceptions ( https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html ) - I am not sure if the HBaseIO would be affected - I am not sure if structured streaming would have implications - it might stop customers from being able to run spark 2 at all due to support agreements - Spark 2.3 (EOY) will drop Scala 2.10 support - IBM's now defunct distro only has 1.6 - Oozie doesn't have a spark 2 action (need to use a shell action) - There are a lot of folks with code running on 1.3,1.4 and 1.5 still - Spark 2.2+ requires Java 8 too, while <2.2 was J7 like Beam (not sure if this has other implications for the cross deployment nature of Beam) My first impressions of Beam was really favourable as it all just worked first time on a CDH Spark 1.6 cluster. For us it is lacking resources to refactor legacy code which delays the 2.2 push. With that said I think is it very reasonable to have a clear cut off in Beam, especially if it limits progress / causes headaches in packaging, robustness etc. I'd recommend putting it in a 6 month timeframe which might align with 2.3? Hope this helps, Tim On Mon, Nov 13, 2017 at 10:07 AM, Neville Dipale <nevilled...@gmail.com> wrote: > Hi JB, > > > [X ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only > [ ] 0 (I don't care ;)) > [ ] -1, I would like to still support Spark 1.x, and so having support > of both Spark 1.x and 2.x (please provide specific comment) > > On 13 November 2017 at 10:32, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> Hi Beamers, >> >> I'm forwarding this discussion & vote from the dev mailing list to the >> user mailing list. >> The goal is to have your feedback as user. >> >> Basically, we have two options: >> 1. Right now, in the PR, we support both Spark 1.x and 2.x using three >> artifacts (common, spark1, spark2). You, as users, pick up spark1 or spark2 >> in your dependencies set depending the Spark target version you want. >> 2. The other option is to upgrade and focus on Spark 2.x in Beam 2.3.0. >> If you still want to use Spark 1.x, then, you will be stuck up to Beam >> 2.2.0. >> >> Thoughts ? >> >> Thanks ! >> Regards >> JB >> >> >> Forwarded Message >> Subject: [VOTE] Drop Spark 1.x support to focus on Spark 2.x >> Date: Wed, 8 Nov 2017 08:27:58 +0100 >> From: Jean-Baptiste Onofré <j...@nanthrax.net> >> Reply-To: d...@beam.apache.org >> To: d...@beam.apache.org >> >> Hi all, >> >> as you might know, we are working on Spark 2.x support in the Spark >> runner. >> >> I'm working on a PR about that: >> >> https://github.com/apache/beam/pull/3808 >> >> Today, we have something working with both Spark 1.x and 2.x from a code >> standpoint, but I have to deal with dependencies. It's the first step of >> the update as I'm still using RDD, the second step would be to support >> dataframe (but for that, I would need PCollection elements with schemas, >> that's another topic on which Eugene, Reuven and I are discussing). >> >> However, as all major distributions now ship Spark 2.x, I don't think >> it's required anymore to support Spark 1.x. >> >> If we agree, I will update and cleanup the PR to only support and focus >> on Spark 2.x. >> >> So, that's why I'm calling for a vote: >> >> [ ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only >> [ ] 0 (I don't care ;)) >> [ ] -1, I would like to still support Spark 1.x, and so having support >> of both Spark 1.x and 2.x (please provide specific comment) >> >> This vote is open for 48 hours (I have the commits ready, just waiting >> the end of the vote to push on the PR). >> >> Thanks ! >> Regards >> JB >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> > >
Re: ElasticSearch with RestHighLevelClient
Hi Ryan, I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster. I am told 2.2.0 is expected within a couple weeks. My work is only a proof of concept for now, but I put in 300M fairly small docs at around 100,000/sec on a 3 node cluster without any issue [1]. Hope this helps, Tim [1] https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Ryan, > > the last version of ElasticsearchIO (that will be included in Beam 2.2.0) > supports Elasticsearch 5.x. > > The client should be created in the @Setup (or @StartBundle) and release > cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement > to actually store the elements in the PCollection. > > Regards > JB > > > On 10/23/2017 08:53 PM, Ryan Bobko wrote: > >> Hi JB, >> Thanks for your input. I'm trying to update ElasticsearchIO, and >> hopefully learn a bit about Beam in the process. The documentation >> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd >> prefer not to have two ES libs in my classpath if I can avoid it. I'm >> just getting started, so my pipeline is quite simple: >> >> pipeline.apply( "Raw Reader", reader ) // read raw files >> .apply( "Document Generator", ParDo.of( extractor ) ) // >> create my document objects for ES insertion >> .apply( "Elastic Writer", new ElasticWriter( ... ); // >> upload to ES >> >> >> public final class ElasticWriter extends >> PTransform<PCollection, PDone> { >> >>private static final Logger log = LoggerFactory.getLogger( >> ElasticWriter.class ); >>private final String elasticurl; >> >>public ElasticWriter( String url ) { >> elasticurl = url; >>} >> >>@Override >>public PDone expand( PCollection input ) { >> input.apply( ParDo.of( new WriteFn( elasticurl ) ) ); >> return PDone.in( input.getPipeline() ); >>} >> >>public static class WriteFn extends DoFn<Document, Void> implements >> Serializable { >> >> private transient RestHighLevelClient client; >> private final String elasticurl; >> >> public WriteFn( String elasticurl ) { >>this.elasticurl = elasticurl; >> } >> >> @Setup >> public void setup() { >>log.debug( " into WriteFn::setup" ); >>HttpHost elastic = HttpHost.create( elasticurl ); >>RestClientBuilder bldr = RestClient.builder( elastic ); >> >>// if this is uncommented, the program never exits >>//client = new RestHighLevelClient( bldr.build() ); >> } >> >> @Teardown >> public void teardown() { >>log.debug( " into WriteFn::teardown" ); >>// there's nothing to tear down >> } >> >> @ProcessElement >> public void pe( ProcessContext c ) { >>Document doc = DocumentImpl.from( c.element() ); >>log.debug( "writing {} to elastic", doc.getMetadata().first( >> Metadata.NAME ) ); >> >>// this is where I want to write to ES, but for now, just write >> a text file >> >>ObjectMapper mpr = new ObjectMapper(); >> >>try ( Writer fos = new BufferedWriter( new FileWriter( new File( >> "/tmp/writers", >>doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) >> ) { >> mpr.writeValue( fos, doc ); >>} >>catch ( IOException ioe ) { >> log.error( ioe.getLocalizedMessage(), ioe ); >>} >> } >>} >> } >> >> >> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >> wrote: >> >>> Hi Ryan, >>> >>> Why don't you use the ElasticsearchIO for that ? >>> >>> Anyway, can you share your pipeline where you have the ParDo calling your >>> DoFn ? >>> >>> Thanks, >>> Regards >>> JB >>> >>> >>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote: >>> >>>> >>>> Hi List, >>>> I'm trying to write an updated ElasticSearch client using the >>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only >>>> interested in writes at this time, so I'm u
Re: KafkaIO and Avro
Thanks Eugene On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <rang...@google.com> wrote: > Ah, nice. It works. > > On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <kirpic...@google.com> > wrote: > >> The following compiles fine: >> >> >> p.apply(KafkaIO.<String, Envelope>read() >> .withBootstrapServers("kafka:9092") >> .withTopic("dbserver1.inventory.customers") >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializerAndCoder >> ((Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) >> >> >> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> wrote: >> >>> Same for me. It does not look like there is an annotation to suppress >>> the error. >>> >>> >>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> >>>> Hi Eugene, >>>> >>>> I understood that was where Andrew started and reported this. I tried >>>> and saw the same as him. >>>> >>>> incompatible types: java.lang.Class>>> afka.serializers.KafkaAvroDeserializer> cannot be converted to >>>> org.apache.kafka.common.serialization.Deserializer>>> pipelines.io.avro.Envelope> >>>> >>>> similarly with >>>> (Class>) KafkaAvroDeserializer.class >>>> >>>> >>>> >>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com >>>> > wrote: >>>> >>>>> I don't think extending the class is necessary. Not sure I understand >>>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have >>>>> you tried this? >>>>> >>>>> p.apply(KafkaIO.<String, Envelope>read() >>>>> .withValueDeserializerAndCoder((Deserializer)Kafka >>>>> AvroDeserializer.class, >>>>> AvroCoder.of(Envelope.class)) >>>>> >>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson < >>>>> timrobertson...@gmail.com> wrote: >>>>> >>>>>> Hi Raghu >>>>>> >>>>>> I tried that but with KafkaAvroDeserializer already implementing >>>>>> Deserializer I couldn't get it to work... I didn't spend too >>>>>> much time though and agree something like that would be cleaner. >>>>>> >>>>>> Cheers, >>>>>> Tim >>>>>> >>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Tim. >>>>>>> >>>>>>> How about extending KafkaAvroDeserializer rather >>>>>>> than AbstractKafkaAvroDeserializer? >>>>>>> >>>>>>> TypedKafkaAvroDeserializer class below is useful, but not directly >>>>>>> usable by the yet. It needs to store the actual type in Kafka consumer >>>>>>> config to retrieve at run time. >>>>>>> Even without storing the class, it is still useful. It simplifies >>>>>>> user code: >>>>>>> >>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>> TypedKafkaAvroDeserializer {} >>>>>>> >>>>>>> This should be part of same package as KafkaAvroDeserializer >>>>>>> (surprised it is not there yet). >>>>>>> >>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >>>>>>> timrobertson...@gmail.com> wrote: >>>>>>> >>>>>>>> Happy to hear >>>>>>>> >>>>>>>> I wonder if we could do something like this (totally untested): >>>>>>>> >>>>>>>> public class TypedKafkaAvroDeserializer extends >>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer { >>>>>>>>@Override >>>>>>>> public T deserialize(String s, byte[] bytes) { >>>>>>>> return (T) this.deserialize(bytes); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> On Thu, Oct 19, 2017
Re: KafkaIO and Avro
Hi Eugene, I understood that was where Andrew started and reported this. I tried and saw the same as him. incompatible types: java.lang.Class cannot be converted to org.apache.kafka.common.serialization.Deserializer similarly with (Class>) KafkaAvroDeserializer.class On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > I don't think extending the class is necessary. Not sure I understand why > a simple type casting for withDeserializerAndCoder doesn't work? Have you > tried this? > > p.apply(KafkaIO.<String, Envelope>read() > .withValueDeserializerAndCoder((Deserializer) > KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <timrobertson...@gmail.com> > wrote: > >> Hi Raghu >> >> I tried that but with KafkaAvroDeserializer already implementing >> Deserializer I couldn't get it to work... I didn't spend too >> much time though and agree something like that would be cleaner. >> >> Cheers, >> Tim >> >> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote: >> >>> Thanks Tim. >>> >>> How about extending KafkaAvroDeserializer rather than >>> AbstractKafkaAvroDeserializer? >>> >>> TypedKafkaAvroDeserializer class below is useful, but not directly >>> usable by the yet. It needs to store the actual type in Kafka consumer >>> config to retrieve at run time. >>> Even without storing the class, it is still useful. It simplifies user >>> code: >>> >>> public class EnvelopeKafkaAvroDeserializer extends >>> TypedKafkaAvroDeserializer {} >>> >>> This should be part of same package as KafkaAvroDeserializer (surprised >>> it is not there yet). >>> >>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> >>>> Happy to hear >>>> >>>> I wonder if we could do something like this (totally untested): >>>> >>>> public class TypedKafkaAvroDeserializer extends >>>> AbstractKafkaAvroDeserializer implements Deserializer { >>>>@Override >>>> public T deserialize(String s, byte[] bytes) { >>>> return (T) this.deserialize(bytes); >>>> } >>>> } >>>> >>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >>>> andrew+b...@andrew-jones.com> wrote: >>>> >>>>> Thanks Tim, that works! >>>>> >>>>> Full code is: >>>>> >>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>> AbstractKafkaAvroDeserializer implements Deserializer { >>>>> @Override >>>>> public void configure(Map<String, ?> configs, boolean isKey) { >>>>> configure(new KafkaAvroDeserializerConfig(configs)); >>>>> } >>>>> >>>>> @Override >>>>> public Envelope deserialize(String s, byte[] bytes) { >>>>> return (Envelope) this.deserialize(bytes); >>>>> } >>>>> >>>>> @Override >>>>> public void close() {} >>>>> } >>>>> >>>>> Nicer than my solution so think that is the one I'm going to go with >>>>> for now. >>>>> >>>>> Thanks, >>>>> Andrew >>>>> >>>>> >>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>>>> >>>>> Hi Andrew, >>>>> >>>>> I also saw the same behaviour. >>>>> >>>>> It's not pretty but perhaps try this? It was my last idea I ran out of >>>>> time to try... >>>>> >>>>> >>>>> *// Basically a copy KafkaAvroDeserializer with the casts in >>>>> deserialize**public class *EnvelopeAvroDeserializer *extends >>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer { >>>>> >>>>> ... >>>>> >>>>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>>>> >>>>> *return *(Envelope) *this*.deserialize(bytes); >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>>>> readerS
Re: KafkaIO and Avro
Happy to hear I wonder if we could do something like this (totally untested): public class TypedKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { @Override public T deserialize(String s, byte[] bytes) { return (T) this.deserialize(bytes); } } On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+b...@andrew-jones.com > wrote: > Thanks Tim, that works! > > Full code is: > > public class EnvelopeKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { > @Override > public void configure(Map<String, ?> configs, boolean isKey) { > configure(new KafkaAvroDeserializerConfig(configs)); > } > > @Override > public Envelope deserialize(String s, byte[] bytes) { > return (Envelope) this.deserialize(bytes); > } > > @Override > public void close() {} > } > > Nicer than my solution so think that is the one I'm going to go with for > now. > > Thanks, > Andrew > > > On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran out of > time to try... > > > *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize**public class *EnvelopeAvroDeserializer *extends > *AbstractKafkaAvroDeserializer *implements *Deserializer { > > ... > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > *return *(Envelope) *this*.deserialize(bytes); > > } > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) { > > *return *(Envelope) *this*.deserialize(bytes, readerSchema); > > } > > > > ... > > } > > Tim > > > On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > > Using Object doesn't work unfortunately. I get an 'Unable to automatically > infer a Coder' error at runtime. > > This is the code: > > p.apply(KafkaIO.<String, Object>read() > .withValueDeserializer(KafkaAvroDeserializer.class) > > It compiles, but at runtime: > > Caused by: java.lang.RuntimeException: Unable to automatically infer a > Coder for the Kafka Deserializer class > io.confluent.kafka.serializers.KafkaAvroDeserializer: > no coder registered for type class java.lang.Object > at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) > > So far the only thing I've got working is this, where I use the > ByteArrayDeserializer and then parse Avro myself: > > private static KafkaAvroDecoder avroDecoder; > static { > final Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://registry:8081;); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true); > VerifiableProperties vProps = new VerifiableProperties(props); > avroDecoder = new KafkaAvroDecoder(vProps); > } > > public static void main(String[] args) { > > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > p.apply(KafkaIO.<byte[], byte[]>read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withoutMetadata( > ) > .apply(Values.<byte[]>create()) > .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() { > @ProcessElement > public void processElement(ProcessContext c) { > Envelope data = (Envelope) > avroDecoder.fromBytes(c.element()); > c.output(data); > } > })) > > Thanks, > Andrew > > On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: > > On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com> > wrote: > > It seems that KafkaAvroDeserializer implements Deserializer, > though I suppose with proper configuration that Object will at run-time be > your desired type. Have you tried adding some Java type casts to make it > compile? > > > +1, cast might be the simplest fix. Alternately you can wrap or > extend KafkaAvroDeserializer as
Re: KafkaIO and Avro
Hi Andrew, I also saw the same behaviour. It's not pretty but perhaps try this? It was my last idea I ran out of time to try... // Basically a copy KafkaAvroDeserializer with the casts in deserialize public class EnvelopeAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { ... public Envelope deserialize(String s, byte[] bytes) { return (Envelope) this.deserialize(bytes); } public Envelope deserialize(String s, byte[] bytes, Schema readerSchema) { return (Envelope) this.deserialize(bytes, readerSchema); } ... } Tim On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+b...@andrew-jones.com > wrote: > Using Object doesn't work unfortunately. I get an 'Unable to automatically > infer a Coder' error at runtime. > > This is the code: > > p.apply(KafkaIO.<String, Object>read() > .withValueDeserializer(KafkaAvroDeserializer.class) > > It compiles, but at runtime: > > Caused by: java.lang.RuntimeException: Unable to automatically infer a > Coder for the Kafka Deserializer class > io.confluent.kafka.serializers.KafkaAvroDeserializer: > no coder registered for type class java.lang.Object > at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) > > So far the only thing I've got working is this, where I use the > ByteArrayDeserializer and then parse Avro myself: > > private static KafkaAvroDecoder avroDecoder; > static { > final Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://registry:8081;); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true); > VerifiableProperties vProps = new VerifiableProperties(props); > avroDecoder = new KafkaAvroDecoder(vProps); > } > > public static void main(String[] args) { > > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > p.apply(KafkaIO.<byte[], byte[]>read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withoutMetadata( > ) > .apply(Values.<byte[]>create()) > .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() { > @ProcessElement > public void processElement(ProcessContext c) { > Envelope data = (Envelope) avroDecoder.fromBytes(c. > element()); > c.output(data); > } > })) > > Thanks, > Andrew > > On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: > > On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com> > wrote: > > It seems that KafkaAvroDeserializer implements Deserializer, > though I suppose with proper configuration that Object will at run-time be > your desired type. Have you tried adding some Java type casts to make it > compile? > > > +1, cast might be the simplest fix. Alternately you can wrap or > extend KafkaAvroDeserializer as Tim suggested. It would cast the Object > returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime. > > > On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <timrobertson...@gmail.com> > wrote: > > I just tried quickly and see the same as you Andrew. > We're missing something obvious or else extending KafkaAvroDeserializer seems > necessary right? > > On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > Hi, > > I'm trying to read Avro data from a Kafka stream using KafkaIO. I think > it should be as simple as: > > p.apply(KafkaIO.<String, Envelope>*read*() > .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > Where Envelope is the name of the Avro class. However, that does not > compile and I get the following error: > > incompatible types: > java.lang.Class > cannot be converted to java.lang.Class org.apache.kafka.common.serialization.Deserializer .inventory.customers.Envelope>> > > I've tried a number of variations on this theme but haven't yet worked > it out and am starting to run out of ideas... > > Has anyone successfully read Avro data from Kafka? > > The code I'm using can be found at > https://github.com/andrewrjones/debezium-kafka-beam-example and a full > environment can be created with Docker. > > Thanks, > Andrew > > >
Run a ParDo after a Partition
Hi folks, I feel a little daft asking this, and suspect I am missing the obvious... Can someone please tell me how I can do a ParDo following a Partition? In spark I'd just repartition(...) and then a map() but I don't spot in the Beam API how to run a ParDo on each partition in parallel. Do I need to multithread manually? I tried this: PCollectionList partitioned = verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); // does not run in parallel on spark... for (PCollection untyped : partitioned.getAll()) { PCollection inputDocs = partitioned.get(untyped).apply( "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); inputDocs.apply(SolrIO.write().to("solr-load" ).withConnectionConfiguration(conn)); } [For background: I'm using a non splittable custom Hadoop InputFormat which means I end up with an RDD as a single partition, and need to split it to run expensive op in parallel] Thanks, Tim