> On 5 Aug 2021, at 18:17, Tao Li <[email protected]> wrote: > > It was a great presentation!
Thanks! > Regarding my perf testing, I was not doing aggregation, filtering, > projection or joining. I was simply reading all the fields of parquet and > then immediately save PCollection back to parquet. Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns. > Regarding SDF translation, is it enabled by default? From Beam 2.30.0 release notes: "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))” — Alexey > I will check out ParquetIO splittable. Thanks! > > From: Alexey Romanenko <[email protected]> > Date: Thursday, August 5, 2021 at 6:40 AM > To: Tao Li <[email protected]> > Cc: "[email protected]" <[email protected]>, Andrew Pilloud > <[email protected]>, Ismaël Mejía <[email protected]>, Kyle Weaver > <[email protected]>, Yuchu Cao <[email protected]> > Subject: Re: Perf issue with Beam on spark (spark runner) > > It’s very likely that Spark SQL may have much better performance because of > SQL push-downs and avoiding additional ser/deser operations. > > In the same time, did you try to leverage "withProjection()” in ParquetIO and > project only the fields that you needed? > > Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])? > > Also, using SDF translation for Read on Spark Runner can cause performance > degradation as well (we noticed that in our experiments). Try to use non-SDF > read (if not yet) [2] > > > PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m > not sure if a recording is already available but you can find the slides here > [3] that can be helpful. > > > — > Alexey > > [1] https://issues.apache.org/jira/browse/BEAM-12070 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D&reserved=0> > [2] https://issues.apache.org/jira/browse/BEAM-10670 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D&reserved=0> > [3] > https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing > > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D&reserved=0> > > > >> On 5 Aug 2021, at 03:07, Tao Li <[email protected] <mailto:[email protected]>> >> wrote: >> >> @Alexey Romanenko <mailto:[email protected]> @Ismaël Mejía >> <mailto:[email protected]> I assume you are experts on spark runner. Can you >> please take a look at this thread and confirm this jira covers the causes >> https://issues.apache.org/jira/browse/BEAM-12646 >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D&reserved=0> >> ? >> >> This perf issue is currently a blocker to me.. >> >> Thanks so much! >> >> From: Tao Li <[email protected] <mailto:[email protected]>> >> Reply-To: "[email protected] <mailto:[email protected]>" >> <[email protected] <mailto:[email protected]>> >> Date: Friday, July 30, 2021 at 3:53 PM >> To: Andrew Pilloud <[email protected] <mailto:[email protected]>>, >> "[email protected] <mailto:[email protected]>" <[email protected] >> <mailto:[email protected]>> >> Cc: Kyle Weaver <[email protected] <mailto:[email protected]>>, Yuchu >> Cao <[email protected] <mailto:[email protected]>> >> Subject: Re: Perf issue with Beam on spark (spark runner) >> >> Thanks everyone for your help. >> >> We actually did another round of perf comparison between Beam (on spark) and >> native spark, without any projection/filtering in the query (to rule out the >> “predicate pushdown” factor). >> >> The time spent on Beam with spark runner is still taking 3-5x period of time >> compared with native spark, and the cause >> ishttps://issues.apache.org/jira/browse/BEAM-12646 >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=LXb2NFUuF3BKkUX6m6rAdMJ%2B04e8WjxPNcDVn4zibl8%3D&reserved=0> >> according to the spark metrics. Spark runner is pretty much the bottleneck. >> >> <image001.png> >> >> From: Andrew Pilloud <[email protected] <mailto:[email protected]>> >> Date: Thursday, July 29, 2021 at 2:11 PM >> To: "[email protected] <mailto:[email protected]>" >> <[email protected] <mailto:[email protected]>> >> Cc: Tao Li <[email protected] <mailto:[email protected]>>, Kyle Weaver >> <[email protected] <mailto:[email protected]>>, Yuchu Cao >> <[email protected] <mailto:[email protected]>> >> Subject: Re: Perf issue with Beam on spark (spark runner) >> >> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0. >> >> Andrew >> >> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <[email protected] >> <mailto:[email protected]>> wrote: >>> Beam SQL doesn't currently have project pushdown for ParquetIO (we are >>> working to expand this to more IOs). Using ParquetIO withProjection >>> directly will produce better results. >>> >>> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <[email protected] >>> <mailto:[email protected]>> wrote: >>>> Could you try using Beam SQL [1] and see if that gives more similar result >>>> to your Spark SQL query? I would also be curious if the performance is >>>> sufficient using withProjection to only read the auction, price, and >>>> bidder columns. >>>> >>>> [1] https://beam.apache.org/documentation/dsls/sql/overview/ >>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Lkc5LggzRQGNZZqL2iDc3s2ffCaZ%2BCeojSmX1fSO5Us%3D&reserved=0> >>>> [2] >>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema- >>>> >>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=H0ReHZFalbX7nR3dtrywxSCNW0obeNo3V7mU0D5sSVw%3D&reserved=0> >>>> >>>> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <[email protected] >>>> <mailto:[email protected]>> wrote: >>>>> Thanks Robert for filing BEAM-12646 >>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=anUxcfDmqEthRMepq5MwIT%2BNGdtH1UYblregkzszL6U%3D&reserved=0>. >>>>> This perf issue is a blocker for us to adopt Beam. It would be great if >>>>> the community could conclude the root cause and share an ETA for the fix. >>>>> Thanks so much! >>>>> >>>>> >>>>> From: Robert Bradshaw <[email protected] <mailto:[email protected]>> >>>>> Date: Wednesday, July 21, 2021 at 3:51 PM >>>>> To: Tao Li <[email protected] <mailto:[email protected]>> >>>>> Cc: "[email protected] <mailto:[email protected]>" >>>>> <[email protected] <mailto:[email protected]>>, Kyle Weaver >>>>> <[email protected] <mailto:[email protected]>>, Yuchu Cao >>>>> <[email protected] <mailto:[email protected]>> >>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>> >>>>> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>>> @Robert Bradshaw <mailto:[email protected]> with Spark API, the code >>>>>> is actually much simple. We are just calling spark SQL API against a >>>>>> hive table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder >>>>>> FROM bid”) >>>>> >>>>> Good chance that this is pushing projection of those few fields up into >>>>> the read operator, which could be a dramatic savings. You could try doing >>>>> it manually in Beam, or use Beam's SQL that should do the same. >>>>> >>>>>> >>>>>> I think the “globally windowed GBK” optimization you are proposing is a >>>>>> good callout. >>>>> >>>>> Filed https://issues.apache.org/jira/browse/BEAM-12646 >>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001722646%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=FzOdDP7LL6eMj3Kk2k1J1F5rF64WHP55OOSme%2BqC96Q%3D&reserved=0> >>>>> to track. >>>>> >>>>>> >>>>>> From: Robert Bradshaw <[email protected] <mailto:[email protected]>> >>>>>> Reply-To: "[email protected] <mailto:[email protected]>" >>>>>> <[email protected] <mailto:[email protected]>> >>>>>> Date: Wednesday, July 21, 2021 at 1:09 PM >>>>>> To: user <[email protected] <mailto:[email protected]>> >>>>>> Cc: Kyle Weaver <[email protected] <mailto:[email protected]>>, >>>>>> Yuchu Cao <[email protected] <mailto:[email protected]>> >>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>> >>>>>> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <[email protected] >>>>>> <mailto:[email protected]>> wrote: >>>>>>> Kyle, I don’t expect such a huge perf diff as well. To your question, >>>>>>> no I am not specifying withProjection or withSplit for parquet reader. >>>>>> >>>>>> Are you doing so in your Spark code? >>>>>> >>>>>>> Below is my parquet read code: >>>>>>> >>>>>>> PCollection<FileIO.ReadableFile> files = pipeline >>>>>>> .apply(FileIO.match().filepattern(beamRequiredPath)) >>>>>>> .apply(FileIO.readMatches()); >>>>>>> >>>>>>> PCollection<Row> table = files >>>>>>> .apply(ParquetIO >>>>>>> .readFiles(avroSchema) >>>>>>> >>>>>>> .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, >>>>>>> "false"))) >>>>>>> .apply(MapElements >>>>>>> .into(TypeDescriptors.rows()) >>>>>>> >>>>>>> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema)))) >>>>>>> >>>>>>> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema))); >>>>>>> >>>>>>> >>>>>>> According to my investigation, looks like below call stack is very >>>>>>> computation intensive and causing a lot of GC time. And looks like the >>>>>>> stack comes from spark runner code. >>>>>> >>>>>> This does look inordinately expensive. I wonder if it would make sense >>>>>> to optimize the globally windowed GBK as some other runners do. >>>>>> >>>>>>> >>>>>>> <image001.png> >>>>>>> >>>>>>> From: Kyle Weaver <[email protected] <mailto:[email protected]>> >>>>>>> Date: Tuesday, July 20, 2021 at 3:57 PM >>>>>>> To: Tao Li <[email protected] <mailto:[email protected]>> >>>>>>> Cc: "[email protected] <mailto:[email protected]>" >>>>>>> <[email protected] <mailto:[email protected]>>, Yuchu Cao >>>>>>> <[email protected] <mailto:[email protected]>> >>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>>> >>>>>>> Beam has its own implementation of Parquet IO, and doesn't use Spark's. >>>>>>> It's possible Spark's implementation does more optimizations, though >>>>>>> perhaps not enough to result in such a dramatic difference. >>>>>>> >>>>>>> I'm curious how your Parquet read is configured. In particular, if >>>>>>> withProjection or withSplit are set. >>>>>>> >>>>>>> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <[email protected] >>>>>>> <mailto:[email protected]>> wrote: >>>>>>>> Hi Kyle, >>>>>>>> >>>>>>>> The ParDo (which references the code I shared) is the only >>>>>>>> transformation in my pipeline. The input and output are parquet files >>>>>>>> in S3 (we are using beam ParquetIO). >>>>>>>> >>>>>>>> From: Kyle Weaver <[email protected] <mailto:[email protected]>> >>>>>>>> Reply-To: "[email protected] <mailto:[email protected]>" >>>>>>>> <[email protected] <mailto:[email protected]>> >>>>>>>> Date: Tuesday, July 20, 2021 at 2:13 PM >>>>>>>> To: "[email protected] <mailto:[email protected]>" >>>>>>>> <[email protected] <mailto:[email protected]>> >>>>>>>> Cc: Yuchu Cao <[email protected] <mailto:[email protected]>> >>>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>>>> >>>>>>>> The DoFn you shared is simple enough that it seems unlikely to be the >>>>>>>> performance bottleneck here. >>>>>>>> >>>>>>>> Can you share more information about your complete pipeline? What >>>>>>>> other transforms are there? What sources/sinks are you using? >>>>>>>> >>>>>>>> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <[email protected] >>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>> Hi Beam community, >>>>>>>>> >>>>>>>>> We are seeing a serious perf issue with beam using spark runner, >>>>>>>>> compared with writing a native spark app. Can you please provide some >>>>>>>>> help? >>>>>>>>> >>>>>>>>> The beam on spark app is taking 8-10 min, whereas a native spark is >>>>>>>>> only taking 2 min. Below is Spark UI, from which you can see the >>>>>>>>> flatMapToPair method is very time consuming. Is this method call >>>>>>>>> coming from spark runner? >>>>>>>>> >>>>>>>>> <image001.png> >>>>>>>>> >>>>>>>>> I suspect this is caused by high GC time. See “GC Time” column below: >>>>>>>>> >>>>>>>>> <image002.png> >>>>>>>>> >>>>>>>>> >>>>>>>>> The beam code is really simple, just a per row processing. >>>>>>>>> >>>>>>>>> public class CalcFn extends DoFn<Row, Row> { >>>>>>>>> protected Logger log = LoggerFactory.getLogger(this.getClass()); >>>>>>>>> private Schema schema; >>>>>>>>> >>>>>>>>> public CalcFn(Schema schema) { >>>>>>>>> this.schema = schema; >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> @ProcessElement >>>>>>>>> public void processElement(@Element Row row,OutputReceiver<Row> >>>>>>>>> receiver) { >>>>>>>>> // Row row = ctx.element(); >>>>>>>>> Long auction_value = (Long) row.getBaseValue("auction"); >>>>>>>>> Long bid_value = (Long) row.getBaseValue("bidder"); >>>>>>>>> Long price = (Long) row.getBaseValue("price"); >>>>>>>>> Double euro = price * 0.82; >>>>>>>>> >>>>>>>>> receiver.output( Row.withSchema(schema) >>>>>>>>> .addValues(auction_value, euro, bid_value).build()); >>>>>>>>> } >>>>>>>>> }
