Is "id" a partition column? The inclusive manifest evaluator will use an inclusive projection of the filter, which is true for all non-partition column predicates.
On Thu, May 2, 2019 at 4:08 PM Anton Okolnychyi <aokolnyc...@apple.com> wrote: > Hm, but why don’t we filter out manifests using stats for partition > columns? Is there a bug in InclusiveManifestEvaluator? I just tested it > locally. InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and > ref(name="id") == 1)' as rowFilter, which is transformed into ‘true’ after > that expression is bound. Apparently, the bound expression doesn’t allow us > to filter any manifests. > > > On 2 May 2019, at 15:33, Ryan Blue <rb...@netflix.com> wrote: > > Yes, the append code tries to keep the existing order. That way, if you > have a time-based append pattern, it just works. Similarly, if you've spent > the time to optimize the order, the append doesn't rewrite and ruin it. > > Sounds like you just need to sort the file list by partition to group as > much as possible, then append. We do this when we convert tables to iceberg > to start with a good split across manifests. > > On Thu, May 2, 2019 at 3:17 PM Gautam <gautamkows...@gmail.com> wrote: > >> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. >> So it preserves the natrual order of manifests so i guess it groups based >> on when manifests were created so the answer is whatever order the commits >> were done. If batches within multiple days were committed out of order then >> a manifest could end up with multiple days. >> >> >> On Thu, May 2, 2019 at 2:23 PM Gautam <gautamkows...@gmail.com> wrote: >> >>> Ok, thanks for the tip on not having to by tied to a hierarchical >>> partition spec. >>> >>> Although this still doesn't explain why all the manifests are scanned, >>> coz it should be pruning the list of manifests and it's not. Is my >>> understanding correct that the manifest grouping might be re-shuffling up >>> the days so query on 1 day might map to all manifests even? Does manifest >>> merging optimize for partition boundaries or is it based on manifest's >>> natural order? >>> >>> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com> wrote: >>> >>>> You also don't need to use year, month, and day. You can just use day. >>>> >>>> The time-based partition functions all produce ordinals, not local >>>> values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day >>>> and hour. In fact, I should open a PR to throw an exception when there are >>>> duplicate partition functions... >>>> >>>> On Thu, May 2, 2019 at 1:52 PM Gautam <gautamkows...@gmail.com> wrote: >>>> >>>>> FYI .. The test Partition Spec is : >>>>> [ >>>>> YEAR: identity(21) >>>>> MONTH: identity(22) >>>>> DAY: identity(23) >>>>> batchId: identity(24) >>>>> ] >>>>> >>>>> >>>>> >>>>> On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkows...@gmail.com> wrote: >>>>> >>>>>> > Using those, you should be able to control parallelism. If you want >>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg >>>>>> won’t compact manifests. >>>>>> >>>>>> This is helpful. Thanks for the pointer on increasing parallelism. >>>>>> Will try this out. So I understand the behaviour, if a different dataset >>>>>> has >=5000 batches then the resultant # manifests would be >>>>>> (total_num_batches % 5000 ) ? >>>>>> >>>>>> > What surprises me is that you’re not getting much benefit from >>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from >>>>>> it. >>>>>> >>>>>> Pardon the verbose example but i think it'l help explain what i'm >>>>>> seeing .. >>>>>> >>>>>> Regarding manifest filtering: I tested if partition filters in sql >>>>>> query actually reduce manifests being inspected. In my example, i have 16 >>>>>> manifests that point to 4000 batch partitions ( each file is restricted >>>>>> to >>>>>> one partition as we'r using physical partitioning in the table ). So >>>>>> when >>>>>> querying for .. WHERE batchId = 'xyz' .. at most 1 manifest should be >>>>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among >>>>>> the >>>>>> 16) , right? But i see that all 16 are being inspected in >>>>>> BaseTableScan.planFiles(). Correct me if i'm wrong, it's this call [1] >>>>>> that should be giving me the manifests that match a partition. When I >>>>>> inspect this it says `matchingManifests = 16` , which is all the >>>>>> manifests in the table. This *could* be due to the fact that our >>>>>> batch ids are random UUIDs so lower/upper bounds may not help coz there's >>>>>> no inherent ordering amongst batches. >>>>>> But then i tried year = 2019 and month = 01 and day = 01 which also >>>>>> scanned all manifests. Could this be due to the way Iceberg manifests are >>>>>> re-grouped and merged? If so, shouldn't re-grouping honour partition >>>>>> boundaries and optimize for it? >>>>>> >>>>>> >>>>>> Cheers, >>>>>> -Gautam. >>>>>> >>>>>> [1] - >>>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173 >>>>>> >>>>>> >>>>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote: >>>>>> >>>>>>> Good questions. Grouping manifests is configurable at the table >>>>>>> level. There are 2 settings: >>>>>>> >>>>>>> - commit.manifest.target-size-bytes defaults to 8MB, this is the >>>>>>> target size that Iceberg will compact to >>>>>>> - commit.manifest.min-count-to-merge defaults to 100, this is >>>>>>> the minimum number of files before a compaction is triggered >>>>>>> >>>>>>> Using those, you should be able to control parallelism. If you want >>>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg >>>>>>> won’t compact manifests. >>>>>>> >>>>>>> What surprises me is that you’re not getting much benefit from >>>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit >>>>>>> from >>>>>>> it. You might try sorting the data files by partition before adding >>>>>>> them to >>>>>>> the table. That will cluster data files in the same partition so you can >>>>>>> read fewer manifests. >>>>>>> >>>>>>> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkows...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey Anton, >>>>>>>> Sorry bout the delay on this. Been caught up with some >>>>>>>> other things. Thanks for raising issue#173 . >>>>>>>> >>>>>>>> So the root cause is indeed the density and size of the schema. >>>>>>>> While I agree the option to configure stats for columns is good >>>>>>>> (although >>>>>>>> i'm not fully convinced that this is purely due to lower/upper >>>>>>>> bounds). For >>>>>>>> instance, maybe it's just taking a while to iterate over manifest rows >>>>>>>> and >>>>>>>> deserialize the DataFile stats in each read? The solution i'm using >>>>>>>> right >>>>>>>> now is to parallelize the manifest reading in split planning. We >>>>>>>> regenerated the Iceberg table with more manifests. Now the code >>>>>>>> enables the >>>>>>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu >>>>>>>> by >>>>>>>> default, configurable using 'iceberg.worker.num-threads' ) to read >>>>>>>> manifests. >>>>>>>> >>>>>>>> On that note, the ability to parallelize is limited to how many >>>>>>>> manifests are in the table. So as a test, for a table with 4000 files >>>>>>>> we >>>>>>>> created one manifest per file (think of one file as a single batch >>>>>>>> commit >>>>>>>> in this case). So I was hoping to get a parallelism factor of 4000. But >>>>>>>> Iceberg summarizes manifests into fewer manifests with each commit so >>>>>>>> we >>>>>>>> instead ended up with 16 manifests. So now split planning is limited to >>>>>>>> reading at most 16 units of parallelism. Is this grouping of manifests >>>>>>>> into >>>>>>>> fewer configurable? if not should we allow making this configurable? >>>>>>>> >>>>>>>> Sorry if this is forking a different conversation. If so, I can >>>>>>>> start a separate conversation thread on this. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi < >>>>>>>> aokolnyc...@apple.com> wrote: >>>>>>>> >>>>>>>>> Hey Gautam, >>>>>>>>> >>>>>>>>> Out of my curiosity, did you manage to confirm the root cause of >>>>>>>>> the issue? >>>>>>>>> >>>>>>>>> P.S. I created [1] so that we can make collection of lower/upper >>>>>>>>> bounds configurable. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Anton >>>>>>>>> >>>>>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173 >>>>>>>>> >>>>>>>>> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Thanks guys for the insights .. >>>>>>>>> >>>>>>>>> > I like Anton's idea to have an optional list of columns for >>>>>>>>> which we keep stats. That would allow us to avoid storing stats for >>>>>>>>> thousands of columns that won't ever be used. Another option here is >>>>>>>>> to add >>>>>>>>> a flag to keep stats only for top-level columns. That's much less >>>>>>>>> configuration for users and probably does the right thing in many >>>>>>>>> cases. >>>>>>>>> Simpler to use but not as fast in all cases is sometimes a good >>>>>>>>> compromise. >>>>>>>>> >>>>>>>>> This makes sense to me. It adds a variable that data pipelines can >>>>>>>>> tweak on to improve performance. I will add an issue on Github to add >>>>>>>>> a >>>>>>>>> stats config/flag. Although, having said that, I would try to optimize >>>>>>>>> around this coz read patterns are hardly ever known a priori and >>>>>>>>> adding a >>>>>>>>> column to this list means having to re-write the entire data again. >>>>>>>>> So i'l >>>>>>>>> try the other suggestion which is parallelizing on multiple manifests. >>>>>>>>> >>>>>>>>> > To clarify my comment on changing the storage: the idea is to >>>>>>>>> use separate columns instead of a map and then use a columnar storage >>>>>>>>> format so we can project those columns independently. Avro can't >>>>>>>>> project >>>>>>>>> columns independently. This wouldn't help on the write side and may >>>>>>>>> just >>>>>>>>> cause a lot of seeking on the read side that diminishes the benefits. >>>>>>>>> >>>>>>>>> Gotcha. >>>>>>>>> >>>>>>>>> > Also, now that we have more details, I think there is a second >>>>>>>>> problem. Because we expect several manifests in a table, we >>>>>>>>> parallelize >>>>>>>>> split planning on manifests instead of splits of manifest files. This >>>>>>>>> planning operation is happening in a single thread instead of in >>>>>>>>> parallel. >>>>>>>>> I think if you split the write across several manifests, you'd >>>>>>>>> improve wall >>>>>>>>> time. >>>>>>>>> >>>>>>>>> This might actually be the issue here, this was a test bench >>>>>>>>> dataset so the writer job created a single manifest for all the data >>>>>>>>> in the >>>>>>>>> dataset which isn't really how we will do things in prod. I'l try and >>>>>>>>> create the metadata based on productions expected commit pattern. >>>>>>>>> >>>>>>>>> >>>>>>>>> Regarding Iceberg not truncating large bounded column values >>>>>>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I >>>>>>>>> didn't consider this with our dataset. The current evidence is leading >>>>>>>>> towards the number of columns and the sheer number of files that the >>>>>>>>> manifest is maintaining but this is a good thing to look into. >>>>>>>>> >>>>>>>>> Thanks again guys. >>>>>>>>> >>>>>>>>> -Gautam. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I like Anton's idea to have an optional list of columns for which >>>>>>>>>> we keep stats. That would allow us to avoid storing stats for >>>>>>>>>> thousands of >>>>>>>>>> columns that won't ever be used. Another option here is to add a >>>>>>>>>> flag to >>>>>>>>>> keep stats only for top-level columns. That's much less >>>>>>>>>> configuration for >>>>>>>>>> users and probably does the right thing in many cases. Simpler to >>>>>>>>>> use but >>>>>>>>>> not as fast in all cases is sometimes a good compromise. >>>>>>>>>> >>>>>>>>>> To clarify my comment on changing the storage: the idea is to use >>>>>>>>>> separate columns instead of a map and then use a columnar storage >>>>>>>>>> format so >>>>>>>>>> we can project those columns independently. Avro can't project >>>>>>>>>> columns >>>>>>>>>> independently. This wouldn't help on the write side and may just >>>>>>>>>> cause a >>>>>>>>>> lot of seeking on the read side that diminishes the benefits. >>>>>>>>>> >>>>>>>>>> Also, now that we have more details, I think there is a second >>>>>>>>>> problem. Because we expect several manifests in a table, we >>>>>>>>>> parallelize >>>>>>>>>> split planning on manifests instead of splits of manifest files. This >>>>>>>>>> planning operation is happening in a single thread instead of in >>>>>>>>>> parallel. >>>>>>>>>> I think if you split the write across several manifests, you'd >>>>>>>>>> improve wall >>>>>>>>>> time. >>>>>>>>>> >>>>>>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi < >>>>>>>>>> aokolnyc...@apple.com> wrote: >>>>>>>>>> >>>>>>>>>>> No, we haven’t experienced it yet. The manifest size is huge in >>>>>>>>>>> your case. To me, Ryan is correct: it might be either big >>>>>>>>>>> lower/upper >>>>>>>>>>> bounds (then truncation will help) or a big number columns (then >>>>>>>>>>> collecting >>>>>>>>>>> lower/upper bounds only for specific columns will help). I think >>>>>>>>>>> both >>>>>>>>>>> optimizations are needed and will reduce the manifest size. >>>>>>>>>>> >>>>>>>>>>> Since you mentioned you have a lot of columns and we collect >>>>>>>>>>> bounds for nested struct fields, I am wondering if you could revert >>>>>>>>>>> [1] >>>>>>>>>>> locally and compare the manifest size. >>>>>>>>>>> >>>>>>>>>>> [1] - >>>>>>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f >>>>>>>>>>> >>>>>>>>>>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Thanks for responding Anton! Do we think the delay is mainly due >>>>>>>>>>> to lower/upper bound filtering? have you faced this? I haven't >>>>>>>>>>> exactly >>>>>>>>>>> found where the slowness is yet. It's generally due to the stats >>>>>>>>>>> filtering >>>>>>>>>>> but what part of it is causing this much network traffic. There's >>>>>>>>>>> CloseableIteratable that takes a ton of time on the next() and >>>>>>>>>>> hasNext() >>>>>>>>>>> calls. My guess is the expression evaluation on each manifest entry >>>>>>>>>>> is >>>>>>>>>>> what's doing it. >>>>>>>>>>> >>>>>>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi < >>>>>>>>>>> aokolnyc...@apple.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I think we need to have a list of columns for which we want to >>>>>>>>>>>> collect stats and that should be configurable by the user. Maybe, >>>>>>>>>>>> this >>>>>>>>>>>> config should be applicable only to lower/upper bounds. As we now >>>>>>>>>>>> collect >>>>>>>>>>>> stats even for nested struct fields, this might generate a lot of >>>>>>>>>>>> data. In >>>>>>>>>>>> most cases, users cluster/sort their data by a subset of data >>>>>>>>>>>> columns to >>>>>>>>>>>> have fast queries with predicates on those columns. So, being able >>>>>>>>>>>> to >>>>>>>>>>>> configure columns for which to collect lower/upper bounds seems >>>>>>>>>>>> reasonable. >>>>>>>>>>>> >>>>>>>>>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> > The length in bytes of the schema is 109M as compared to >>>>>>>>>>>> 687K of the non-stats dataset. >>>>>>>>>>>> >>>>>>>>>>>> Typo, length in bytes of *manifest*. schema is the same. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam < >>>>>>>>>>>> gautamkows...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Correction, partition count = 4308. >>>>>>>>>>>>> >>>>>>>>>>>>> > Re: Changing the way we keep stats. Avro is a block >>>>>>>>>>>>> splittable format and is friendly with parallel compute >>>>>>>>>>>>> frameworks like >>>>>>>>>>>>> Spark. >>>>>>>>>>>>> >>>>>>>>>>>>> Here I am trying to say that we don't need to change the >>>>>>>>>>>>> format to columnar right? The current format is already friendly >>>>>>>>>>>>> for >>>>>>>>>>>>> parallelization. >>>>>>>>>>>>> >>>>>>>>>>>>> thanks. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam < >>>>>>>>>>>>> gautamkows...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are >>>>>>>>>>>>>> some details on the dataset with stats : >>>>>>>>>>>>>> >>>>>>>>>>>>>> Iceberg Schema Columns : 20 >>>>>>>>>>>>>> Spark Schema fields : 20 >>>>>>>>>>>>>> Snapshot Summary :{added-data-files=4308, >>>>>>>>>>>>>> added-records=11494037, changed-partition-count=4308, >>>>>>>>>>>>>> total-records=11494037, total-data-files=4308} >>>>>>>>>>>>>> Manifest files :1 >>>>>>>>>>>>>> Manifest details: >>>>>>>>>>>>>> => manifest file path: >>>>>>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro >>>>>>>>>>>>>> => manifest file length: 109,028,885 >>>>>>>>>>>>>> => existing files count: 0 >>>>>>>>>>>>>> => added files count: 4308 >>>>>>>>>>>>>> => deleted files count: 0 >>>>>>>>>>>>>> => partitions count: 4 >>>>>>>>>>>>>> => partition fields count: 4 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Re: Num data files. It has a single manifest keep track of >>>>>>>>>>>>>> 4308 files. Total record count is 11.4 Million. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Re: Columns. You are right that this table has many columns.. >>>>>>>>>>>>>> although it has only 20 top-level columns, num leaf columns are >>>>>>>>>>>>>> in order >>>>>>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) >>>>>>>>>>>>>> and has >>>>>>>>>>>>>> deep levels of nesting. I know Iceberg keeps >>>>>>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf >>>>>>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for >>>>>>>>>>>>>> native, struct types (not yet for map KVs and arrays). The >>>>>>>>>>>>>> length in bytes >>>>>>>>>>>>>> of the schema is 109M as compared to 687K of the non-stats >>>>>>>>>>>>>> dataset. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for >>>>>>>>>>>>>> our datasets with much larger number of data files we want to >>>>>>>>>>>>>> leverage >>>>>>>>>>>>>> iceberg's ability to skip entire files based on these stats. >>>>>>>>>>>>>> This is one of >>>>>>>>>>>>>> the big incentives for us to use Iceberg. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Re: Changing the way we keep stats. Avro is a block >>>>>>>>>>>>>> splittable format and is friendly with parallel compute >>>>>>>>>>>>>> frameworks like >>>>>>>>>>>>>> Spark. So would it make sense for instance to have add an option >>>>>>>>>>>>>> to have >>>>>>>>>>>>>> Spark job / Futures handle split planning? In a larger >>>>>>>>>>>>>> context, 109M is >>>>>>>>>>>>>> not that much metadata given that Iceberg is meant for datasets >>>>>>>>>>>>>> where the >>>>>>>>>>>>>> metadata itself is Bigdata scale. I'm curious on how folks with >>>>>>>>>>>>>> larger >>>>>>>>>>>>>> sized metadata (in GB) are optimizing this today. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> -Gautam. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue < >>>>>>>>>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for bringing this up! My initial theory is that this >>>>>>>>>>>>>>> table has a ton of stats data that you have to read. That could >>>>>>>>>>>>>>> happen in a >>>>>>>>>>>>>>> couple of cases. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> First, you might have large values in some columns. Parquet >>>>>>>>>>>>>>> will suppress its stats if values are larger than 4k and those >>>>>>>>>>>>>>> are what >>>>>>>>>>>>>>> Iceberg uses. But that could still cause you to store two 1k+ >>>>>>>>>>>>>>> objects for >>>>>>>>>>>>>>> each large column (lower and upper bounds). With a lot of data >>>>>>>>>>>>>>> files, that >>>>>>>>>>>>>>> could add up quickly. The solution here is to implement #113 >>>>>>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so >>>>>>>>>>>>>>> that we don't store the actual min and max for string or binary >>>>>>>>>>>>>>> columns, >>>>>>>>>>>>>>> but instead a truncated value that is just above or just below. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The second case is when you have a lot of columns. Each >>>>>>>>>>>>>>> column stores both a lower and upper bound, so 1,000 columns >>>>>>>>>>>>>>> could easily >>>>>>>>>>>>>>> take 8k per file. If this is the problem, then maybe we want to >>>>>>>>>>>>>>> have a way >>>>>>>>>>>>>>> to turn off column stats. We could also think of ways to change >>>>>>>>>>>>>>> the way >>>>>>>>>>>>>>> stats are stored in the manifest files, but that only helps if >>>>>>>>>>>>>>> we move to a >>>>>>>>>>>>>>> columnar format to store manifests, so this is probably not a >>>>>>>>>>>>>>> short-term >>>>>>>>>>>>>>> fix. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If you can share a bit more information about this table, we >>>>>>>>>>>>>>> can probably tell which one is the problem. I'm guessing it is >>>>>>>>>>>>>>> the large >>>>>>>>>>>>>>> values problem. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam < >>>>>>>>>>>>>>> gautamkows...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I have been testing Iceberg reading with and without stats >>>>>>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a >>>>>>>>>>>>>>>> huge jump in >>>>>>>>>>>>>>>> network traffic with the latter.. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both >>>>>>>>>>>>>>>> written in Iceberg format. One with and the other without >>>>>>>>>>>>>>>> stats collected >>>>>>>>>>>>>>>> in Iceberg manifests. In particular the difference between the >>>>>>>>>>>>>>>> writers used >>>>>>>>>>>>>>>> for the two datasets is this PR: >>>>>>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which >>>>>>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured >>>>>>>>>>>>>>>> tcpdump from >>>>>>>>>>>>>>>> query scans run on these two datasets. The partition being >>>>>>>>>>>>>>>> scanned >>>>>>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in >>>>>>>>>>>>>>>> both datasets. >>>>>>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem >>>>>>>>>>>>>>>> (ADLS) when >>>>>>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the >>>>>>>>>>>>>>>> same Iceberg >>>>>>>>>>>>>>>> reader code to access both datasets. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep >>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc >>>>>>>>>>>>>>>> -l >>>>>>>>>>>>>>>> reading from file >>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type >>>>>>>>>>>>>>>> EN10MB >>>>>>>>>>>>>>>> (Ethernet) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *8844* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep >>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc >>>>>>>>>>>>>>>> -l >>>>>>>>>>>>>>>> reading from file >>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB >>>>>>>>>>>>>>>> (Ethernet) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *269708* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As a consequence of this the query response times get >>>>>>>>>>>>>>>> affected drastically (illustrated below). I must confess that >>>>>>>>>>>>>>>> I am on a >>>>>>>>>>>>>>>> slow internet connection via VPN connecting to the remote FS. >>>>>>>>>>>>>>>> But the >>>>>>>>>>>>>>>> dataset without stats took just 1m 49s while the dataset with >>>>>>>>>>>>>>>> stats took >>>>>>>>>>>>>>>> 26m 48s to read the same sized data. Most of that time in the >>>>>>>>>>>>>>>> latter >>>>>>>>>>>>>>>> dataset was spent split planning in Manifest reading and stats >>>>>>>>>>>>>>>> evaluation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>> all=> select count(*) from >>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues where batchId = >>>>>>>>>>>>>>>> '4a6f95abac924159bb3d7075373395c9'; >>>>>>>>>>>>>>>> count(1) >>>>>>>>>>>>>>>> ---------- >>>>>>>>>>>>>>>> 3627 >>>>>>>>>>>>>>>> (1 row) >>>>>>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> all=> select count(*) from iceberg_scratch_pad_demo_11 >>>>>>>>>>>>>>>> where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and >>>>>>>>>>>>>>>> batchId = >>>>>>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15'; >>>>>>>>>>>>>>>> count(1) >>>>>>>>>>>>>>>> ---------- >>>>>>>>>>>>>>>> 3808 >>>>>>>>>>>>>>>> (1 row) >>>>>>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some >>>>>>>>>>>>>>>> caching or parallelism option here that can be leveraged. >>>>>>>>>>>>>>>> Would appreciate >>>>>>>>>>>>>>>> some guidance. If there isn't a straightforward fix and others >>>>>>>>>>>>>>>> feel this is >>>>>>>>>>>>>>>> an issue I can raise an issue and look into it further. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> -Gautam. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Ryan Blue >>>>>>>>>>>>>>> Software Engineer >>>>>>>>>>>>>>> Netflix >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Ryan Blue >>>>>>>>>> Software Engineer >>>>>>>>>> Netflix >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> Software Engineer >>>>>>> Netflix >>>>>>> >>>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> > > -- > Ryan Blue > Software Engineer > Netflix > > > -- Ryan Blue Software Engineer Netflix