Hm, but why don’t we filter out manifests using stats for partition columns? Is there a bug in InclusiveManifestEvaluator? I just tested it locally. InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and ref(name="id") == 1)' as rowFilter, which is transformed into ‘true’ after that expression is bound. Apparently, the bound expression doesn’t allow us to filter any manifests.
> On 2 May 2019, at 15:33, Ryan Blue <rb...@netflix.com> wrote: > > Yes, the append code tries to keep the existing order. That way, if you have > a time-based append pattern, it just works. Similarly, if you've spent the > time to optimize the order, the append doesn't rewrite and ruin it. > > Sounds like you just need to sort the file list by partition to group as much > as possible, then append. We do this when we convert tables to iceberg to > start with a good split across manifests. > > On Thu, May 2, 2019 at 3:17 PM Gautam <gautamkows...@gmail.com > <mailto:gautamkows...@gmail.com>> wrote: > Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. So > it preserves the natrual order of manifests so i guess it groups based on > when manifests were created so the answer is whatever order the commits were > done. If batches within multiple days were committed out of order then a > manifest could end up with multiple days. > > > On Thu, May 2, 2019 at 2:23 PM Gautam <gautamkows...@gmail.com > <mailto:gautamkows...@gmail.com>> wrote: > Ok, thanks for the tip on not having to by tied to a hierarchical partition > spec. > > Although this still doesn't explain why all the manifests are scanned, coz > it should be pruning the list of manifests and it's not. Is my understanding > correct that the manifest grouping might be re-shuffling up the days so query > on 1 day might map to all manifests even? Does manifest merging optimize for > partition boundaries or is it based on manifest's natural order? > > On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com > <mailto:rb...@netflix.com>> wrote: > You also don't need to use year, month, and day. You can just use day. > > The time-based partition functions all produce ordinals, not local values: > month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour. > In fact, I should open a PR to throw an exception when there are duplicate > partition functions... > > On Thu, May 2, 2019 at 1:52 PM Gautam <gautamkows...@gmail.com > <mailto:gautamkows...@gmail.com>> wrote: > FYI .. The test Partition Spec is : > [ > YEAR: identity(21) > MONTH: identity(22) > DAY: identity(23) > batchId: identity(24) > ] > > > > On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkows...@gmail.com > <mailto:gautamkows...@gmail.com>> wrote: > > Using those, you should be able to control parallelism. If you want to test > > with 4,000, then you can set the min count to 5,000 so Iceberg won’t > > compact manifests. > > This is helpful. Thanks for the pointer on increasing parallelism. Will try > this out. So I understand the behaviour, if a different dataset has >=5000 > batches then the resultant # manifests would be (total_num_batches % 5000 ) ? > > > What surprises me is that you’re not getting much benefit from filtering > > out manifests that aren’t helpful. We see a lot of benefit from it. > > Pardon the verbose example but i think it'l help explain what i'm seeing .. > > Regarding manifest filtering: I tested if partition filters in sql query > actually reduce manifests being inspected. In my example, i have 16 manifests > that point to 4000 batch partitions ( each file is restricted to one > partition as we'r using physical partitioning in the table ). So when > querying for .. WHERE batchId = 'xyz' .. at most 1 manifest should be read > coz 1 batch == 1 file which should be tracked by 1 manifest (among the 16) , > right? But i see that all 16 are being inspected in > BaseTableScan.planFiles(). Correct me if i'm wrong, it's this call [1] that > should be giving me the manifests that match a partition. When I inspect this > it says `matchingManifests = 16` , which is all the manifests in the table. > This could be due to the fact that our batch ids are random UUIDs so > lower/upper bounds may not help coz there's no inherent ordering amongst > batches. > But then i tried year = 2019 and month = 01 and day = 01 which also scanned > all manifests. Could this be due to the way Iceberg manifests are re-grouped > and merged? If so, shouldn't re-grouping honour partition boundaries and > optimize for it? > > > Cheers, > -Gautam. > > [1] - > https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173 > > <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173> > > > On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com > <mailto:rb...@netflix.com>> wrote: > Good questions. Grouping manifests is configurable at the table level. There > are 2 settings: > > commit.manifest.target-size-bytes defaults to 8MB, this is the target size > that Iceberg will compact to > commit.manifest.min-count-to-merge defaults to 100, this is the minimum > number of files before a compaction is triggered > Using those, you should be able to control parallelism. If you want to test > with 4,000, then you can set the min count to 5,000 so Iceberg won’t compact > manifests. > > What surprises me is that you’re not getting much benefit from filtering out > manifests that aren’t helpful. We see a lot of benefit from it. You might try > sorting the data files by partition before adding them to the table. That > will cluster data files in the same partition so you can read fewer manifests. > > > On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkows...@gmail.com > <mailto:gautamkows...@gmail.com>> wrote: > Hey Anton, > Sorry bout the delay on this. Been caught up with some other > things. Thanks for raising issue#173 . > > So the root cause is indeed the density and size of the schema. While I agree > the option to configure stats for columns is good (although i'm not fully > convinced that this is purely due to lower/upper bounds). For instance, maybe > it's just taking a while to iterate over manifest rows and deserialize the > DataFile stats in each read? The solution i'm using right now is to > parallelize the manifest reading in split planning. We regenerated the > Iceberg table with more manifests. Now the code enables the ParallelIterator > which uses a worker pool of threads (1 thread per cpu by default, > configurable using 'iceberg.worker.num-threads' ) to read manifests. > > On that note, the ability to parallelize is limited to how many manifests are > in the table. So as a test, for a table with 4000 files we created one > manifest per file (think of one file as a single batch commit in this case). > So I was hoping to get a parallelism factor of 4000. But Iceberg summarizes > manifests into fewer manifests with each commit so we instead ended up with > 16 manifests. So now split planning is limited to reading at most 16 units of > parallelism. Is this grouping of manifests into fewer configurable? if not > should we allow making this configurable? > > Sorry if this is forking a different conversation. If so, I can start a > separate conversation thread on this. > > > > > > > On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnyc...@apple.com > <mailto:aokolnyc...@apple.com>> wrote: > Hey Gautam, > > Out of my curiosity, did you manage to confirm the root cause of the issue? > > P.S. I created [1] so that we can make collection of lower/upper bounds > configurable. > > Thanks, > Anton > > [1] - https://github.com/apache/incubator-iceberg/issues/173 > <https://github.com/apache/incubator-iceberg/issues/173> > >> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> >> Thanks guys for the insights .. >> >> > I like Anton's idea to have an optional list of columns for which we keep >> > stats. That would allow us to avoid storing stats for thousands of columns >> > that won't ever be used. Another option here is to add a flag to keep >> > stats only for top-level columns. That's much less configuration for users >> > and probably does the right thing in many cases. Simpler to use but not as >> > fast in all cases is sometimes a good compromise. >> >> This makes sense to me. It adds a variable that data pipelines can tweak on >> to improve performance. I will add an issue on Github to add a stats >> config/flag. Although, having said that, I would try to optimize around this >> coz read patterns are hardly ever known a priori and adding a column to this >> list means having to re-write the entire data again. So i'l try the other >> suggestion which is parallelizing on multiple manifests. >> >> > To clarify my comment on changing the storage: the idea is to use >> > separate columns instead of a map and then use a columnar storage format >> > so we can project those columns independently. Avro can't project columns >> > independently. This wouldn't help on the write side and may just cause a >> > lot of seeking on the read side that diminishes the benefits. >> >> Gotcha. >> >> > Also, now that we have more details, I think there is a second problem. >> > Because we expect several manifests in a table, we parallelize split >> > planning on manifests instead of splits of manifest files. This planning >> > operation is happening in a single thread instead of in parallel. I think >> > if you split the write across several manifests, you'd improve wall time. >> >> This might actually be the issue here, this was a test bench dataset so the >> writer job created a single manifest for all the data in the dataset which >> isn't really how we will do things in prod. I'l try and create the metadata >> based on productions expected commit pattern. >> >> >> Regarding Iceberg not truncating large bounded column values >> https://github.com/apache/incubator-iceberg/issues/113 >> <https://github.com/apache/incubator-iceberg/issues/113> .. I didn't >> consider this with our dataset. The current evidence is leading towards the >> number of columns and the sheer number of files that the manifest is >> maintaining but this is a good thing to look into. >> >> Thanks again guys. >> >> -Gautam. >> >> >> >> >> >> >> >> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com >> <mailto:rb...@netflix.com>> wrote: >> I like Anton's idea to have an optional list of columns for which we keep >> stats. That would allow us to avoid storing stats for thousands of columns >> that won't ever be used. Another option here is to add a flag to keep stats >> only for top-level columns. That's much less configuration for users and >> probably does the right thing in many cases. Simpler to use but not as fast >> in all cases is sometimes a good compromise. >> >> To clarify my comment on changing the storage: the idea is to use separate >> columns instead of a map and then use a columnar storage format so we can >> project those columns independently. Avro can't project columns >> independently. This wouldn't help on the write side and may just cause a lot >> of seeking on the read side that diminishes the benefits. >> >> Also, now that we have more details, I think there is a second problem. >> Because we expect several manifests in a table, we parallelize split >> planning on manifests instead of splits of manifest files. This planning >> operation is happening in a single thread instead of in parallel. I think if >> you split the write across several manifests, you'd improve wall time. >> >> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnyc...@apple.com >> <mailto:aokolnyc...@apple.com>> wrote: >> No, we haven’t experienced it yet. The manifest size is huge in your case. >> To me, Ryan is correct: it might be either big lower/upper bounds (then >> truncation will help) or a big number columns (then collecting lower/upper >> bounds only for specific columns will help). I think both optimizations are >> needed and will reduce the manifest size. >> >> Since you mentioned you have a lot of columns and we collect bounds for >> nested struct fields, I am wondering if you could revert [1] locally and >> compare the manifest size. >> >> [1] - >> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f >> >> <https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f> >> >>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com >>> <mailto:gautamkows...@gmail.com>> wrote: >>> >>> Thanks for responding Anton! Do we think the delay is mainly due to >>> lower/upper bound filtering? have you faced this? I haven't exactly found >>> where the slowness is yet. It's generally due to the stats filtering but >>> what part of it is causing this much network traffic. There's >>> CloseableIteratable that takes a ton of time on the next() and hasNext() >>> calls. My guess is the expression evaluation on each manifest entry is >>> what's doing it. >>> >>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <aokolnyc...@apple.com >>> <mailto:aokolnyc...@apple.com>> wrote: >>> I think we need to have a list of columns for which we want to collect >>> stats and that should be configurable by the user. Maybe, this config >>> should be applicable only to lower/upper bounds. As we now collect stats >>> even for nested struct fields, this might generate a lot of data. In most >>> cases, users cluster/sort their data by a subset of data columns to have >>> fast queries with predicates on those columns. So, being able to configure >>> columns for which to collect lower/upper bounds seems reasonable. >>> >>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com >>>> <mailto:gautamkows...@gmail.com>> wrote: >>>> >>>> > The length in bytes of the schema is 109M as compared to 687K of the >>>> > non-stats dataset. >>>> >>>> Typo, length in bytes of *manifest*. schema is the same. >>>> >>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkows...@gmail.com >>>> <mailto:gautamkows...@gmail.com>> wrote: >>>> Correction, partition count = 4308. >>>> >>>> > Re: Changing the way we keep stats. Avro is a block splittable format >>>> > and is friendly with parallel compute frameworks like Spark. >>>> >>>> Here I am trying to say that we don't need to change the format to >>>> columnar right? The current format is already friendly for >>>> parallelization. >>>> >>>> thanks. >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkows...@gmail.com >>>> <mailto:gautamkows...@gmail.com>> wrote: >>>> Ah, my bad. I missed adding in the schema details .. Here are some details >>>> on the dataset with stats : >>>> >>>> Iceberg Schema Columns : 20 >>>> Spark Schema fields : 20 >>>> Snapshot Summary :{added-data-files=4308, added-records=11494037, >>>> changed-partition-count=4308, total-records=11494037, >>>> total-data-files=4308} >>>> Manifest files :1 >>>> Manifest details: >>>> => manifest file path: >>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro >>>> <> >>>> => manifest file length: 109,028,885 >>>> => existing files count: 0 >>>> => added files count: 4308 >>>> => deleted files count: 0 >>>> => partitions count: 4 >>>> => partition fields count: 4 >>>> >>>> Re: Num data files. It has a single manifest keep track of 4308 files. >>>> Total record count is 11.4 Million. >>>> >>>> Re: Columns. You are right that this table has many columns.. although it >>>> has only 20 top-level columns, num leaf columns are in order of >>>> thousands. This Schema is heavy on structs (in the thousands) and has deep >>>> levels of nesting. I know Iceberg keeps column_sizes, value_counts, >>>> null_value_counts for all leaf fields and additionally lower-bounds, >>>> upper-bounds for native, struct types (not yet for map KVs and arrays). >>>> The length in bytes of the schema is 109M as compared to 687K of the >>>> non-stats dataset. >>>> >>>> Re: Turning off stats. I am looking to leverage stats coz for our datasets >>>> with much larger number of data files we want to leverage iceberg's >>>> ability to skip entire files based on these stats. This is one of the big >>>> incentives for us to use Iceberg. >>>> >>>> Re: Changing the way we keep stats. Avro is a block splittable format and >>>> is friendly with parallel compute frameworks like Spark. So would it make >>>> sense for instance to have add an option to have Spark job / Futures >>>> handle split planning? In a larger context, 109M is not that much >>>> metadata given that Iceberg is meant for datasets where the metadata >>>> itself is Bigdata scale. I'm curious on how folks with larger sized >>>> metadata (in GB) are optimizing this today. >>>> >>>> >>>> Cheers, >>>> -Gautam. >>>> >>>> >>>> >>>> >>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rb...@netflix.com.invalid >>>> <mailto:rb...@netflix.com.invalid>> wrote: >>>> Thanks for bringing this up! My initial theory is that this table has a >>>> ton of stats data that you have to read. That could happen in a couple of >>>> cases. >>>> >>>> First, you might have large values in some columns. Parquet will suppress >>>> its stats if values are larger than 4k and those are what Iceberg uses. >>>> But that could still cause you to store two 1k+ objects for each large >>>> column (lower and upper bounds). With a lot of data files, that could add >>>> up quickly. The solution here is to implement #113 >>>> <https://github.com/apache/incubator-iceberg/issues/113> so that we don't >>>> store the actual min and max for string or binary columns, but instead a >>>> truncated value that is just above or just below. >>>> >>>> The second case is when you have a lot of columns. Each column stores both >>>> a lower and upper bound, so 1,000 columns could easily take 8k per file. >>>> If this is the problem, then maybe we want to have a way to turn off >>>> column stats. We could also think of ways to change the way stats are >>>> stored in the manifest files, but that only helps if we move to a columnar >>>> format to store manifests, so this is probably not a short-term fix. >>>> >>>> If you can share a bit more information about this table, we can probably >>>> tell which one is the problem. I'm guessing it is the large values problem. >>>> >>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkows...@gmail.com >>>> <mailto:gautamkows...@gmail.com>> wrote: >>>> Hello folks, >>>> >>>> I have been testing Iceberg reading with and without stats built into >>>> Iceberg dataset manifest and found that there's a huge jump in network >>>> traffic with the latter.. >>>> >>>> >>>> In my test I am comparing two Iceberg datasets, both written in Iceberg >>>> format. One with and the other without stats collected in Iceberg >>>> manifests. In particular the difference between the writers used for the >>>> two datasets is this PR: >>>> https://github.com/apache/incubator-iceberg/pull/63/files >>>> <https://github.com/apache/incubator-iceberg/pull/63/files> which uses >>>> Iceberg's writers for writing Parquet data. I captured tcpdump from query >>>> scans run on these two datasets. The partition being scanned contains 1 >>>> manifest, 1 parquet data file and ~3700 rows in both datasets. There's a >>>> 30x jump in network traffic to the remote filesystem (ADLS) when i switch >>>> to stats based Iceberg dataset. Both queries used the same Iceberg reader >>>> code to access both datasets. >>>> >>>> ``` >>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep >>>> perfanalysis.adlus15.projectcabostore.net >>>> <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l >>>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, >>>> link-type EN10MB (Ethernet) >>>> >>>> 8844 >>>> >>>> >>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep >>>> perfanalysis.adlus15.projectcabostore.net >>>> <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l >>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, link-type >>>> EN10MB (Ethernet) >>>> >>>> 269708 >>>> >>>> ``` >>>> >>>> As a consequence of this the query response times get affected drastically >>>> (illustrated below). I must confess that I am on a slow internet >>>> connection via VPN connecting to the remote FS. But the dataset without >>>> stats took just 1m 49s while the dataset with stats took 26m 48s to read >>>> the same sized data. Most of that time in the latter dataset was spent >>>> split planning in Manifest reading and stats evaluation. >>>> >>>> ``` >>>> all=> select count(*) from iceberg_geo1_metrixx_qc_postvalues where >>>> batchId = '4a6f95abac924159bb3d7075373395c9'; >>>> count(1) >>>> ---------- >>>> 3627 >>>> (1 row) >>>> Time: 109673.202 ms (01:49.673) >>>> >>>> all=> select count(*) from iceberg_scratch_pad_demo_11 where >>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = >>>> '6d50eeb3e7d74b4f99eea91a27fc8f15'; >>>> count(1) >>>> ---------- >>>> 3808 >>>> (1 row) >>>> Time: 1608058.616 ms (26:48.059) >>>> >>>> ``` >>>> >>>> Has anyone faced this? I'm wondering if there's some caching or >>>> parallelism option here that can be leveraged. Would appreciate some >>>> guidance. If there isn't a straightforward fix and others feel this is an >>>> issue I can raise an issue and look into it further. >>>> >>>> >>>> Cheers, >>>> -Gautam. >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>> >> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix > > > > -- > Ryan Blue > Software Engineer > Netflix > > > -- > Ryan Blue > Software Engineer > Netflix > > > -- > Ryan Blue > Software Engineer > Netflix