Yeah, the moment I sent my email I realized that the table was partitioned by another column :) My bad, it works correctly and filters out manifests.
> On 2 May 2019, at 16:12, Ryan Blue <rb...@netflix.com> wrote: > > Is "id" a partition column? The inclusive manifest evaluator will use an > inclusive projection of the filter, which is true for all non-partition > column predicates. > > On Thu, May 2, 2019 at 4:08 PM Anton Okolnychyi <aokolnyc...@apple.com > <mailto:aokolnyc...@apple.com>> wrote: > Hm, but why don’t we filter out manifests using stats for partition columns? > Is there a bug in InclusiveManifestEvaluator? I just tested it locally. > InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and ref(name="id") > == 1)' as rowFilter, which is transformed into ‘true’ after that expression > is bound. Apparently, the bound expression doesn’t allow us to filter any > manifests. > > >> On 2 May 2019, at 15:33, Ryan Blue <rb...@netflix.com >> <mailto:rb...@netflix.com>> wrote: >> >> Yes, the append code tries to keep the existing order. That way, if you have >> a time-based append pattern, it just works. Similarly, if you've spent the >> time to optimize the order, the append doesn't rewrite and ruin it. >> >> Sounds like you just need to sort the file list by partition to group as >> much as possible, then append. We do this when we convert tables to iceberg >> to start with a good split across manifests. >> >> On Thu, May 2, 2019 at 3:17 PM Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. So >> it preserves the natrual order of manifests so i guess it groups based on >> when manifests were created so the answer is whatever order the commits were >> done. If batches within multiple days were committed out of order then a >> manifest could end up with multiple days. >> >> >> On Thu, May 2, 2019 at 2:23 PM Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> Ok, thanks for the tip on not having to by tied to a hierarchical partition >> spec. >> >> Although this still doesn't explain why all the manifests are scanned, coz >> it should be pruning the list of manifests and it's not. Is my understanding >> correct that the manifest grouping might be re-shuffling up the days so >> query on 1 day might map to all manifests even? Does manifest merging >> optimize for partition boundaries or is it based on manifest's natural order? >> >> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com >> <mailto:rb...@netflix.com>> wrote: >> You also don't need to use year, month, and day. You can just use day. >> >> The time-based partition functions all produce ordinals, not local values: >> month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour. >> In fact, I should open a PR to throw an exception when there are duplicate >> partition functions... >> >> On Thu, May 2, 2019 at 1:52 PM Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> FYI .. The test Partition Spec is : >> [ >> YEAR: identity(21) >> MONTH: identity(22) >> DAY: identity(23) >> batchId: identity(24) >> ] >> >> >> >> On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> > Using those, you should be able to control parallelism. If you want to >> > test with 4,000, then you can set the min count to 5,000 so Iceberg won’t >> > compact manifests. >> >> This is helpful. Thanks for the pointer on increasing parallelism. Will try >> this out. So I understand the behaviour, if a different dataset has >=5000 >> batches then the resultant # manifests would be (total_num_batches % 5000 ) >> ? >> >> > What surprises me is that you’re not getting much benefit from filtering >> > out manifests that aren’t helpful. We see a lot of benefit from it. >> >> Pardon the verbose example but i think it'l help explain what i'm seeing .. >> >> Regarding manifest filtering: I tested if partition filters in sql query >> actually reduce manifests being inspected. In my example, i have 16 >> manifests that point to 4000 batch partitions ( each file is restricted to >> one partition as we'r using physical partitioning in the table ). So when >> querying for .. WHERE batchId = 'xyz' .. at most 1 manifest should be read >> coz 1 batch == 1 file which should be tracked by 1 manifest (among the 16) , >> right? But i see that all 16 are being inspected in >> BaseTableScan.planFiles(). Correct me if i'm wrong, it's this call [1] that >> should be giving me the manifests that match a partition. When I inspect >> this it says `matchingManifests = 16` , which is all the manifests in the >> table. This could be due to the fact that our batch ids are random UUIDs so >> lower/upper bounds may not help coz there's no inherent ordering amongst >> batches. >> But then i tried year = 2019 and month = 01 and day = 01 which also scanned >> all manifests. Could this be due to the way Iceberg manifests are re-grouped >> and merged? If so, shouldn't re-grouping honour partition boundaries and >> optimize for it? >> >> >> Cheers, >> -Gautam. >> >> [1] - >> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173 >> >> <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173> >> >> >> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com >> <mailto:rb...@netflix.com>> wrote: >> Good questions. Grouping manifests is configurable at the table level. There >> are 2 settings: >> >> commit.manifest.target-size-bytes defaults to 8MB, this is the target size >> that Iceberg will compact to >> commit.manifest.min-count-to-merge defaults to 100, this is the minimum >> number of files before a compaction is triggered >> Using those, you should be able to control parallelism. If you want to test >> with 4,000, then you can set the min count to 5,000 so Iceberg won’t compact >> manifests. >> >> What surprises me is that you’re not getting much benefit from filtering out >> manifests that aren’t helpful. We see a lot of benefit from it. You might >> try sorting the data files by partition before adding them to the table. >> That will cluster data files in the same partition so you can read fewer >> manifests. >> >> >> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> Hey Anton, >> Sorry bout the delay on this. Been caught up with some other >> things. Thanks for raising issue#173 . >> >> So the root cause is indeed the density and size of the schema. While I >> agree the option to configure stats for columns is good (although i'm not >> fully convinced that this is purely due to lower/upper bounds). For >> instance, maybe it's just taking a while to iterate over manifest rows and >> deserialize the DataFile stats in each read? The solution i'm using right >> now is to parallelize the manifest reading in split planning. We regenerated >> the Iceberg table with more manifests. Now the code enables the >> ParallelIterator which uses a worker pool of threads (1 thread per cpu by >> default, configurable using 'iceberg.worker.num-threads' ) to read >> manifests. >> >> On that note, the ability to parallelize is limited to how many manifests >> are in the table. So as a test, for a table with 4000 files we created one >> manifest per file (think of one file as a single batch commit in this case). >> So I was hoping to get a parallelism factor of 4000. But Iceberg summarizes >> manifests into fewer manifests with each commit so we instead ended up with >> 16 manifests. So now split planning is limited to reading at most 16 units >> of parallelism. Is this grouping of manifests into fewer configurable? if >> not should we allow making this configurable? >> >> Sorry if this is forking a different conversation. If so, I can start a >> separate conversation thread on this. >> >> >> >> >> >> >> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnyc...@apple.com >> <mailto:aokolnyc...@apple.com>> wrote: >> Hey Gautam, >> >> Out of my curiosity, did you manage to confirm the root cause of the issue? >> >> P.S. I created [1] so that we can make collection of lower/upper bounds >> configurable. >> >> Thanks, >> Anton >> >> [1] - https://github.com/apache/incubator-iceberg/issues/173 >> <https://github.com/apache/incubator-iceberg/issues/173> >> >>> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com >>> <mailto:gautamkows...@gmail.com>> wrote: >>> >>> Thanks guys for the insights .. >>> >>> > I like Anton's idea to have an optional list of columns for which we keep >>> > stats. That would allow us to avoid storing stats for thousands of >>> > columns that won't ever be used. Another option here is to add a flag to >>> > keep stats only for top-level columns. That's much less configuration for >>> > users and probably does the right thing in many cases. Simpler to use but >>> > not as fast in all cases is sometimes a good compromise. >>> >>> This makes sense to me. It adds a variable that data pipelines can tweak on >>> to improve performance. I will add an issue on Github to add a stats >>> config/flag. Although, having said that, I would try to optimize around >>> this coz read patterns are hardly ever known a priori and adding a column >>> to this list means having to re-write the entire data again. So i'l try the >>> other suggestion which is parallelizing on multiple manifests. >>> >>> > To clarify my comment on changing the storage: the idea is to use >>> > separate columns instead of a map and then use a columnar storage format >>> > so we can project those columns independently. Avro can't project columns >>> > independently. This wouldn't help on the write side and may just cause a >>> > lot of seeking on the read side that diminishes the benefits. >>> >>> Gotcha. >>> >>> > Also, now that we have more details, I think there is a second problem. >>> > Because we expect several manifests in a table, we parallelize split >>> > planning on manifests instead of splits of manifest files. This planning >>> > operation is happening in a single thread instead of in parallel. I think >>> > if you split the write across several manifests, you'd improve wall time. >>> >>> This might actually be the issue here, this was a test bench dataset so the >>> writer job created a single manifest for all the data in the dataset which >>> isn't really how we will do things in prod. I'l try and create the metadata >>> based on productions expected commit pattern. >>> >>> >>> Regarding Iceberg not truncating large bounded column values >>> https://github.com/apache/incubator-iceberg/issues/113 >>> <https://github.com/apache/incubator-iceberg/issues/113> .. I didn't >>> consider this with our dataset. The current evidence is leading towards the >>> number of columns and the sheer number of files that the manifest is >>> maintaining but this is a good thing to look into. >>> >>> Thanks again guys. >>> >>> -Gautam. >>> >>> >>> >>> >>> >>> >>> >>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com >>> <mailto:rb...@netflix.com>> wrote: >>> I like Anton's idea to have an optional list of columns for which we keep >>> stats. That would allow us to avoid storing stats for thousands of columns >>> that won't ever be used. Another option here is to add a flag to keep stats >>> only for top-level columns. That's much less configuration for users and >>> probably does the right thing in many cases. Simpler to use but not as fast >>> in all cases is sometimes a good compromise. >>> >>> To clarify my comment on changing the storage: the idea is to use separate >>> columns instead of a map and then use a columnar storage format so we can >>> project those columns independently. Avro can't project columns >>> independently. This wouldn't help on the write side and may just cause a >>> lot of seeking on the read side that diminishes the benefits. >>> >>> Also, now that we have more details, I think there is a second problem. >>> Because we expect several manifests in a table, we parallelize split >>> planning on manifests instead of splits of manifest files. This planning >>> operation is happening in a single thread instead of in parallel. I think >>> if you split the write across several manifests, you'd improve wall time. >>> >>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnyc...@apple.com >>> <mailto:aokolnyc...@apple.com>> wrote: >>> No, we haven’t experienced it yet. The manifest size is huge in your case. >>> To me, Ryan is correct: it might be either big lower/upper bounds (then >>> truncation will help) or a big number columns (then collecting lower/upper >>> bounds only for specific columns will help). I think both optimizations are >>> needed and will reduce the manifest size. >>> >>> Since you mentioned you have a lot of columns and we collect bounds for >>> nested struct fields, I am wondering if you could revert [1] locally and >>> compare the manifest size. >>> >>> [1] - >>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f >>> >>> <https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f> >>> >>>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com >>>> <mailto:gautamkows...@gmail.com>> wrote: >>>> >>>> Thanks for responding Anton! Do we think the delay is mainly due to >>>> lower/upper bound filtering? have you faced this? I haven't exactly found >>>> where the slowness is yet. It's generally due to the stats filtering but >>>> what part of it is causing this much network traffic. There's >>>> CloseableIteratable that takes a ton of time on the next() and hasNext() >>>> calls. My guess is the expression evaluation on each manifest entry is >>>> what's doing it. >>>> >>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <aokolnyc...@apple.com >>>> <mailto:aokolnyc...@apple.com>> wrote: >>>> I think we need to have a list of columns for which we want to collect >>>> stats and that should be configurable by the user. Maybe, this config >>>> should be applicable only to lower/upper bounds. As we now collect stats >>>> even for nested struct fields, this might generate a lot of data. In most >>>> cases, users cluster/sort their data by a subset of data columns to have >>>> fast queries with predicates on those columns. So, being able to configure >>>> columns for which to collect lower/upper bounds seems reasonable. >>>> >>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com >>>>> <mailto:gautamkows...@gmail.com>> wrote: >>>>> >>>>> > The length in bytes of the schema is 109M as compared to 687K of the >>>>> > non-stats dataset. >>>>> >>>>> Typo, length in bytes of *manifest*. schema is the same. >>>>> >>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkows...@gmail.com >>>>> <mailto:gautamkows...@gmail.com>> wrote: >>>>> Correction, partition count = 4308. >>>>> >>>>> > Re: Changing the way we keep stats. Avro is a block splittable format >>>>> > and is friendly with parallel compute frameworks like Spark. >>>>> >>>>> Here I am trying to say that we don't need to change the format to >>>>> columnar right? The current format is already friendly for >>>>> parallelization. >>>>> >>>>> thanks. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkows...@gmail.com >>>>> <mailto:gautamkows...@gmail.com>> wrote: >>>>> Ah, my bad. I missed adding in the schema details .. Here are some >>>>> details on the dataset with stats : >>>>> >>>>> Iceberg Schema Columns : 20 >>>>> Spark Schema fields : 20 >>>>> Snapshot Summary :{added-data-files=4308, added-records=11494037, >>>>> changed-partition-count=4308, total-records=11494037, >>>>> total-data-files=4308} >>>>> Manifest files :1 >>>>> Manifest details: >>>>> => manifest file path: >>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro >>>>> <> >>>>> => manifest file length: 109,028,885 >>>>> => existing files count: 0 >>>>> => added files count: 4308 >>>>> => deleted files count: 0 >>>>> => partitions count: 4 >>>>> => partition fields count: 4 >>>>> >>>>> Re: Num data files. It has a single manifest keep track of 4308 files. >>>>> Total record count is 11.4 Million. >>>>> >>>>> Re: Columns. You are right that this table has many columns.. although it >>>>> has only 20 top-level columns, num leaf columns are in order of >>>>> thousands. This Schema is heavy on structs (in the thousands) and has >>>>> deep levels of nesting. I know Iceberg keeps column_sizes, >>>>> value_counts, null_value_counts for all leaf fields and additionally >>>>> lower-bounds, upper-bounds for native, struct types (not yet for map KVs >>>>> and arrays). The length in bytes of the schema is 109M as compared to >>>>> 687K of the non-stats dataset. >>>>> >>>>> Re: Turning off stats. I am looking to leverage stats coz for our >>>>> datasets with much larger number of data files we want to leverage >>>>> iceberg's ability to skip entire files based on these stats. This is one >>>>> of the big incentives for us to use Iceberg. >>>>> >>>>> Re: Changing the way we keep stats. Avro is a block splittable format and >>>>> is friendly with parallel compute frameworks like Spark. So would it make >>>>> sense for instance to have add an option to have Spark job / Futures >>>>> handle split planning? In a larger context, 109M is not that much >>>>> metadata given that Iceberg is meant for datasets where the metadata >>>>> itself is Bigdata scale. I'm curious on how folks with larger sized >>>>> metadata (in GB) are optimizing this today. >>>>> >>>>> >>>>> Cheers, >>>>> -Gautam. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rb...@netflix.com.invalid >>>>> <mailto:rb...@netflix.com.invalid>> wrote: >>>>> Thanks for bringing this up! My initial theory is that this table has a >>>>> ton of stats data that you have to read. That could happen in a couple of >>>>> cases. >>>>> >>>>> First, you might have large values in some columns. Parquet will suppress >>>>> its stats if values are larger than 4k and those are what Iceberg uses. >>>>> But that could still cause you to store two 1k+ objects for each large >>>>> column (lower and upper bounds). With a lot of data files, that could add >>>>> up quickly. The solution here is to implement #113 >>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that we don't >>>>> store the actual min and max for string or binary columns, but instead a >>>>> truncated value that is just above or just below. >>>>> >>>>> The second case is when you have a lot of columns. Each column stores >>>>> both a lower and upper bound, so 1,000 columns could easily take 8k per >>>>> file. If this is the problem, then maybe we want to have a way to turn >>>>> off column stats. We could also think of ways to change the way stats are >>>>> stored in the manifest files, but that only helps if we move to a >>>>> columnar format to store manifests, so this is probably not a short-term >>>>> fix. >>>>> >>>>> If you can share a bit more information about this table, we can probably >>>>> tell which one is the problem. I'm guessing it is the large values >>>>> problem. >>>>> >>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkows...@gmail.com >>>>> <mailto:gautamkows...@gmail.com>> wrote: >>>>> Hello folks, >>>>> >>>>> I have been testing Iceberg reading with and without stats built into >>>>> Iceberg dataset manifest and found that there's a huge jump in network >>>>> traffic with the latter.. >>>>> >>>>> >>>>> In my test I am comparing two Iceberg datasets, both written in Iceberg >>>>> format. One with and the other without stats collected in Iceberg >>>>> manifests. In particular the difference between the writers used for the >>>>> two datasets is this PR: >>>>> https://github.com/apache/incubator-iceberg/pull/63/files >>>>> <https://github.com/apache/incubator-iceberg/pull/63/files> which uses >>>>> Iceberg's writers for writing Parquet data. I captured tcpdump from query >>>>> scans run on these two datasets. The partition being scanned contains 1 >>>>> manifest, 1 parquet data file and ~3700 rows in both datasets. There's a >>>>> 30x jump in network traffic to the remote filesystem (ADLS) when i switch >>>>> to stats based Iceberg dataset. Both queries used the same Iceberg reader >>>>> code to access both datasets. >>>>> >>>>> ``` >>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep >>>>> perfanalysis.adlus15.projectcabostore.net >>>>> <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l >>>>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, >>>>> link-type EN10MB (Ethernet) >>>>> >>>>> 8844 >>>>> >>>>> >>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep >>>>> perfanalysis.adlus15.projectcabostore.net >>>>> <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l >>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, link-type >>>>> EN10MB (Ethernet) >>>>> >>>>> 269708 >>>>> >>>>> ``` >>>>> >>>>> As a consequence of this the query response times get affected >>>>> drastically (illustrated below). I must confess that I am on a slow >>>>> internet connection via VPN connecting to the remote FS. But the dataset >>>>> without stats took just 1m 49s while the dataset with stats took 26m 48s >>>>> to read the same sized data. Most of that time in the latter dataset was >>>>> spent split planning in Manifest reading and stats evaluation. >>>>> >>>>> ``` >>>>> all=> select count(*) from iceberg_geo1_metrixx_qc_postvalues where >>>>> batchId = '4a6f95abac924159bb3d7075373395c9'; >>>>> count(1) >>>>> ---------- >>>>> 3627 >>>>> (1 row) >>>>> Time: 109673.202 ms (01:49.673) >>>>> >>>>> all=> select count(*) from iceberg_scratch_pad_demo_11 where >>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = >>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15'; >>>>> count(1) >>>>> ---------- >>>>> 3808 >>>>> (1 row) >>>>> Time: 1608058.616 ms (26:48.059) >>>>> >>>>> ``` >>>>> >>>>> Has anyone faced this? I'm wondering if there's some caching or >>>>> parallelism option here that can be leveraged. Would appreciate some >>>>> guidance. If there isn't a straightforward fix and others feel this is an >>>>> issue I can raise an issue and look into it further. >>>>> >>>>> >>>>> Cheers, >>>>> -Gautam. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>> >>> >>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix > > > > -- > Ryan Blue > Software Engineer > Netflix