Hm, but why don’t we filter out manifests using stats for partition columns? Is 
there a bug in InclusiveManifestEvaluator? I just tested it locally. 
InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and ref(name="id") 
== 1)' as rowFilter, which is transformed into ‘true’ after that expression is 
bound. Apparently, the bound expression doesn’t allow us to filter any 
manifests.
 

> On 2 May 2019, at 15:33, Ryan Blue <rb...@netflix.com> wrote:
> 
> Yes, the append code tries to keep the existing order. That way, if you have 
> a time-based append pattern, it just works. Similarly, if you've spent the 
> time to optimize the order, the append doesn't rewrite and ruin it.
> 
> Sounds like you just need to sort the file list by partition to group as much 
> as possible, then append. We do this when we convert tables to iceberg to 
> start with a good split across manifests.
> 
> On Thu, May 2, 2019 at 3:17 PM Gautam <gautamkows...@gmail.com 
> <mailto:gautamkows...@gmail.com>> wrote:
> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. So 
> it preserves the natrual order of manifests so i guess it groups based on 
> when manifests were created so the answer is whatever order the commits were 
> done. If batches within multiple days were committed out of order then a 
> manifest could end up with multiple days. 
> 
> 
> On Thu, May 2, 2019 at 2:23 PM Gautam <gautamkows...@gmail.com 
> <mailto:gautamkows...@gmail.com>> wrote:
> Ok, thanks for the tip on not having to by tied to a hierarchical  partition 
> spec. 
> 
> Although this still doesn't explain why all the manifests are scanned,  coz 
> it should be pruning the list of manifests and it's not. Is my understanding 
> correct that the manifest grouping might be re-shuffling up the days so query 
> on 1 day might map to all manifests even? Does manifest merging optimize for 
> partition boundaries or is it based on manifest's natural order?
> 
> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com 
> <mailto:rb...@netflix.com>> wrote:
> You also don't need to use year, month, and day. You can just use day.
> 
> The time-based partition functions all produce ordinals, not local values: 
> month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour. 
> In fact, I should open a PR to throw an exception when there are duplicate 
> partition functions...
> 
> On Thu, May 2, 2019 at 1:52 PM Gautam <gautamkows...@gmail.com 
> <mailto:gautamkows...@gmail.com>> wrote:
> FYI .. The test Partition Spec is  : 
> [
>   YEAR: identity(21)
>   MONTH: identity(22)
>   DAY: identity(23)
>   batchId: identity(24)
> ]
> 
> 
> 
> On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkows...@gmail.com 
> <mailto:gautamkows...@gmail.com>> wrote:
> > Using those, you should be able to control parallelism. If you want to test 
> > with 4,000, then you can set the min count to 5,000 so Iceberg won’t 
> > compact manifests.
> 
> This is helpful. Thanks for the pointer on increasing parallelism. Will try 
> this out. So I understand the behaviour, if a different dataset has >=5000  
> batches then the resultant # manifests would be (total_num_batches % 5000 ) ? 
> 
> > What surprises me is that you’re not getting much benefit from filtering 
> > out manifests that aren’t helpful. We see a lot of benefit from it. 
> 
> Pardon the verbose example but i think it'l help explain what i'm seeing .. 
> 
> Regarding manifest filtering:  I tested if partition filters in sql query 
> actually reduce manifests being inspected. In my example, i have 16 manifests 
> that point to 4000 batch partitions ( each file is restricted to one 
> partition as we'r using physical partitioning in the table ).  So when 
> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be read 
> coz 1 batch == 1 file which should be tracked by 1 manifest (among the 16) , 
> right? But i see that all 16 are being inspected in 
> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1] that 
> should be giving me the manifests that match a partition. When I inspect this 
>  it says `matchingManifests = 16` ,  which is all the manifests in the table. 
>  This could be due to the fact that our batch ids are random UUIDs so 
> lower/upper bounds may not help coz there's no inherent ordering amongst 
> batches. 
> But then  i tried year = 2019 and month = 01 and day = 01 which also scanned 
> all manifests. Could this be due to the way Iceberg manifests are re-grouped 
> and merged? If so, shouldn't re-grouping honour partition boundaries and 
> optimize for it?
> 
> 
> Cheers,
> -Gautam.
> 
> [1] - 
> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>  
> <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173>
>  
> 
> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com 
> <mailto:rb...@netflix.com>> wrote:
> Good questions. Grouping manifests is configurable at the table level. There 
> are 2 settings:
> 
> commit.manifest.target-size-bytes defaults to 8MB, this is the target size 
> that Iceberg will compact to
> commit.manifest.min-count-to-merge defaults to 100, this is the minimum 
> number of files before a compaction is triggered
> Using those, you should be able to control parallelism. If you want to test 
> with 4,000, then you can set the min count to 5,000 so Iceberg won’t compact 
> manifests.
> 
> What surprises me is that you’re not getting much benefit from filtering out 
> manifests that aren’t helpful. We see a lot of benefit from it. You might try 
> sorting the data files by partition before adding them to the table. That 
> will cluster data files in the same partition so you can read fewer manifests.
> 
> 
> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkows...@gmail.com 
> <mailto:gautamkows...@gmail.com>> wrote:
> Hey Anton,
>             Sorry bout the delay on this. Been caught up with some other 
> things. Thanks for raising issue#173 . 
> 
> So the root cause is indeed the density and size of the schema. While I agree 
> the option to configure stats for columns is good (although i'm not fully 
> convinced that this is purely due to lower/upper bounds). For instance, maybe 
> it's just taking a while to iterate over manifest rows and deserialize the 
> DataFile stats in each read?  The solution i'm using right now is to 
> parallelize the manifest reading in split planning. We regenerated the 
> Iceberg table with more manifests. Now the code enables the ParallelIterator 
> which uses a worker pool of threads (1 thread per cpu by default, 
> configurable using 'iceberg.worker.num-threads' ) to read manifests. 
> 
> On that note, the ability to parallelize is limited to how many manifests are 
> in the table. So as a test, for a table with 4000 files we created one 
> manifest per file (think of one file as a single batch commit in this case). 
> So I was hoping to get a parallelism factor of 4000. But Iceberg summarizes 
> manifests into fewer manifests with each commit so we instead ended up with 
> 16 manifests. So now split planning is limited to reading at most 16 units of 
> parallelism. Is this grouping of manifests into fewer configurable? if not 
> should we allow making this configurable? 
> 
> Sorry if this is forking a different conversation. If so, I can start a 
> separate conversation thread on this. 
> 
> 
> 
> 
> 
> 
> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnyc...@apple.com 
> <mailto:aokolnyc...@apple.com>> wrote:
> Hey Gautam,
> 
> Out of my curiosity, did you manage to confirm the root cause of the issue?
> 
> P.S. I created [1] so that we can make collection of lower/upper bounds 
> configurable.
> 
> Thanks,
> Anton
> 
> [1] - https://github.com/apache/incubator-iceberg/issues/173 
> <https://github.com/apache/incubator-iceberg/issues/173>
> 
>> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com 
>> <mailto:gautamkows...@gmail.com>> wrote:
>> 
>> Thanks guys for the insights ..
>> 
>> > I like Anton's idea to have an optional list of columns for which we keep 
>> > stats. That would allow us to avoid storing stats for thousands of columns 
>> > that won't ever be used. Another option here is to add a flag to keep 
>> > stats only for top-level columns. That's much less configuration for users 
>> > and probably does the right thing in many cases. Simpler to use but not as 
>> > fast in all cases is sometimes a good compromise.
>> 
>> This makes sense to me. It adds a variable that data pipelines can tweak on 
>> to improve performance. I will add an issue on Github to add a stats 
>> config/flag. Although, having said that, I would try to optimize around this 
>> coz read patterns are hardly ever known a priori and adding a column to this 
>> list means having to re-write the entire data again. So i'l try the other 
>> suggestion which is parallelizing on multiple manifests. 
>> 
>> >  To clarify my comment on changing the storage: the idea is to use 
>> > separate columns instead of a map and then use a columnar storage format 
>> > so we can project those columns independently. Avro can't project columns 
>> > independently. This wouldn't help on the write side and may just cause a 
>> > lot of seeking on the read side that diminishes the benefits.
>> 
>> Gotcha.
>> 
>> > Also, now that we have more details, I think there is a second problem. 
>> > Because we expect several manifests in a table, we parallelize split 
>> > planning on manifests instead of splits of manifest files. This planning 
>> > operation is happening in a single thread instead of in parallel. I think 
>> > if you split the write across several manifests, you'd improve wall time.
>> 
>> This might actually be the issue here, this was a test bench dataset so the 
>> writer job created a single manifest for all the data in the dataset which 
>> isn't really how we will do things in prod. I'l try and create the metadata 
>> based on productions expected commit pattern.
>> 
>> 
>> Regarding Iceberg not truncating large bounded column values 
>> https://github.com/apache/incubator-iceberg/issues/113 
>> <https://github.com/apache/incubator-iceberg/issues/113> .. I didn't 
>> consider this with our dataset. The current evidence is leading towards the 
>> number of columns and the sheer number of files that the manifest is 
>> maintaining but this is a good thing to look into.
>> 
>> Thanks again guys. 
>> 
>> -Gautam.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com 
>> <mailto:rb...@netflix.com>> wrote:
>> I like Anton's idea to have an optional list of columns for which we keep 
>> stats. That would allow us to avoid storing stats for thousands of columns 
>> that won't ever be used. Another option here is to add a flag to keep stats 
>> only for top-level columns. That's much less configuration for users and 
>> probably does the right thing in many cases. Simpler to use but not as fast 
>> in all cases is sometimes a good compromise.
>> 
>> To clarify my comment on changing the storage: the idea is to use separate 
>> columns instead of a map and then use a columnar storage format so we can 
>> project those columns independently. Avro can't project columns 
>> independently. This wouldn't help on the write side and may just cause a lot 
>> of seeking on the read side that diminishes the benefits.
>> 
>> Also, now that we have more details, I think there is a second problem. 
>> Because we expect several manifests in a table, we parallelize split 
>> planning on manifests instead of splits of manifest files. This planning 
>> operation is happening in a single thread instead of in parallel. I think if 
>> you split the write across several manifests, you'd improve wall time.
>> 
>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnyc...@apple.com 
>> <mailto:aokolnyc...@apple.com>> wrote:
>> No, we haven’t experienced it yet. The manifest size is huge in your case. 
>> To me, Ryan is correct: it might be either big lower/upper bounds (then 
>> truncation will help) or a big number columns (then collecting lower/upper 
>> bounds only for specific columns will help). I think both optimizations are 
>> needed and will reduce the manifest size.
>> 
>> Since you mentioned you have a lot of columns and we collect bounds for 
>> nested struct fields, I am wondering if you could revert [1] locally and 
>> compare the manifest size.
>> 
>> [1] - 
>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>  
>> <https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f>
>> 
>>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com 
>>> <mailto:gautamkows...@gmail.com>> wrote:
>>> 
>>> Thanks for responding Anton! Do we think the delay is mainly due to 
>>> lower/upper bound filtering? have you faced this? I haven't exactly found 
>>> where the slowness is yet. It's generally due to the stats filtering but 
>>> what part of it is causing this much network traffic. There's 
>>> CloseableIteratable  that takes a ton of time on the next() and hasNext() 
>>> calls. My guess is the expression evaluation on each manifest entry is 
>>> what's doing it. 
>>> 
>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <aokolnyc...@apple.com 
>>> <mailto:aokolnyc...@apple.com>> wrote:
>>> I think we need to have a list of columns for which we want to collect 
>>> stats and that should be configurable by the user. Maybe, this config 
>>> should be applicable only to lower/upper bounds. As we now collect stats 
>>> even for nested struct fields, this might generate a lot of data. In most 
>>> cases, users cluster/sort their data by a subset of data columns to have 
>>> fast queries with predicates on those columns. So, being able to configure 
>>> columns for which to collect lower/upper bounds seems reasonable.
>>> 
>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com 
>>>> <mailto:gautamkows...@gmail.com>> wrote:
>>>> 
>>>> >  The length in bytes of the schema is 109M as compared to 687K of the 
>>>> > non-stats dataset. 
>>>> 
>>>> Typo, length in bytes of *manifest*. schema is the same. 
>>>> 
>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkows...@gmail.com 
>>>> <mailto:gautamkows...@gmail.com>> wrote:
>>>> Correction, partition count = 4308.
>>>> 
>>>> > Re: Changing the way we keep stats. Avro is a block splittable format 
>>>> > and is friendly with parallel compute frameworks like Spark. 
>>>> 
>>>> Here I am trying to say that we don't need to change the format to 
>>>> columnar right? The current format is already friendly for 
>>>> parallelization. 
>>>> 
>>>> thanks.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkows...@gmail.com 
>>>> <mailto:gautamkows...@gmail.com>> wrote:
>>>> Ah, my bad. I missed adding in the schema details .. Here are some details 
>>>> on the dataset with stats :
>>>> 
>>>>  Iceberg Schema Columns : 20
>>>>  Spark Schema fields : 20
>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037, 
>>>> changed-partition-count=4308, total-records=11494037, 
>>>> total-data-files=4308}
>>>>  Manifest files :1
>>>>  Manifest details:
>>>>      => manifest file path: 
>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>  <>
>>>>      => manifest file length: 109,028,885
>>>>      => existing files count: 0
>>>>      => added files count: 4308
>>>>      => deleted files count: 0
>>>>      => partitions count: 4
>>>>      => partition fields count: 4
>>>> 
>>>> Re: Num data files. It has a single manifest keep track of 4308 files. 
>>>> Total record count is 11.4 Million.
>>>> 
>>>> Re: Columns. You are right that this table has many columns.. although it 
>>>> has only 20 top-level columns,  num leaf columns are in order of 
>>>> thousands. This Schema is heavy on structs (in the thousands) and has deep 
>>>> levels of nesting.  I know Iceberg keeps  column_sizes, value_counts, 
>>>> null_value_counts for all leaf fields and additionally lower-bounds, 
>>>> upper-bounds for native, struct types (not yet for map KVs and arrays).  
>>>> The length in bytes of the schema is 109M as compared to 687K of the 
>>>> non-stats dataset. 
>>>> 
>>>> Re: Turning off stats. I am looking to leverage stats coz for our datasets 
>>>> with much larger number of data files we want to leverage iceberg's 
>>>> ability to skip entire files based on these stats. This is one of the big 
>>>> incentives for us to use Iceberg. 
>>>> 
>>>> Re: Changing the way we keep stats. Avro is a block splittable format and 
>>>> is friendly with parallel compute frameworks like Spark. So would it make 
>>>> sense for instance to have add an option to have Spark job / Futures  
>>>> handle split planning?   In a larger context, 109M is not that much 
>>>> metadata given that Iceberg is meant for datasets where the metadata 
>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized 
>>>> metadata (in GB) are optimizing this today. 
>>>> 
>>>> 
>>>> Cheers,
>>>> -Gautam.
>>>> 
>>>>  
>>>> 
>>>> 
>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rb...@netflix.com.invalid 
>>>> <mailto:rb...@netflix.com.invalid>> wrote:
>>>> Thanks for bringing this up! My initial theory is that this table has a 
>>>> ton of stats data that you have to read. That could happen in a couple of 
>>>> cases.
>>>> 
>>>> First, you might have large values in some columns. Parquet will suppress 
>>>> its stats if values are larger than 4k and those are what Iceberg uses. 
>>>> But that could still cause you to store two 1k+ objects for each large 
>>>> column (lower and upper bounds). With a lot of data files, that could add 
>>>> up quickly. The solution here is to implement #113 
>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that we don't 
>>>> store the actual min and max for string or binary columns, but instead a 
>>>> truncated value that is just above or just below.
>>>> 
>>>> The second case is when you have a lot of columns. Each column stores both 
>>>> a lower and upper bound, so 1,000 columns could easily take 8k per file. 
>>>> If this is the problem, then maybe we want to have a way to turn off 
>>>> column stats. We could also think of ways to change the way stats are 
>>>> stored in the manifest files, but that only helps if we move to a columnar 
>>>> format to store manifests, so this is probably not a short-term fix.
>>>> 
>>>> If you can share a bit more information about this table, we can probably 
>>>> tell which one is the problem. I'm guessing it is the large values problem.
>>>> 
>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkows...@gmail.com 
>>>> <mailto:gautamkows...@gmail.com>> wrote:
>>>> Hello folks, 
>>>> 
>>>> I have been testing Iceberg reading with and without stats built into 
>>>> Iceberg dataset manifest and found that there's a huge jump in network 
>>>> traffic with the latter..
>>>> 
>>>> 
>>>> In my test I am comparing two Iceberg datasets, both written in Iceberg 
>>>> format. One with and the other without stats collected in Iceberg 
>>>> manifests. In particular the difference between the writers used for the 
>>>> two datasets is this PR: 
>>>> https://github.com/apache/incubator-iceberg/pull/63/files 
>>>> <https://github.com/apache/incubator-iceberg/pull/63/files> which uses 
>>>> Iceberg's writers for writing Parquet data. I captured tcpdump from query 
>>>> scans run on these two datasets.  The partition being scanned contains 1 
>>>> manifest, 1 parquet data file and ~3700 rows in both datasets. There's a 
>>>> 30x jump in network traffic to the remote filesystem (ADLS) when i switch 
>>>> to stats based Iceberg dataset. Both queries used the same Iceberg reader 
>>>> code to access both datasets. 
>>>> 
>>>> ```
>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r 
>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep 
>>>> perfanalysis.adlus15.projectcabostore.net 
>>>> <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, 
>>>> link-type EN10MB (Ethernet)
>>>> 
>>>> 8844
>>>> 
>>>> 
>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r 
>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep 
>>>> perfanalysis.adlus15.projectcabostore.net 
>>>> <http://perfanalysis.adlus15.projectcabostore.net/> | grep ">" | wc -l
>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, link-type 
>>>> EN10MB (Ethernet)
>>>> 
>>>> 269708
>>>> 
>>>> ```
>>>> 
>>>> As a consequence of this the query response times get affected drastically 
>>>> (illustrated below). I must confess that I am on a slow internet 
>>>> connection via VPN connecting to the remote FS. But the dataset without 
>>>> stats took just 1m 49s while the dataset with stats took 26m 48s to read 
>>>> the same sized data. Most of that time in the latter dataset was spent 
>>>> split planning in Manifest reading and stats evaluation.
>>>> 
>>>> ```
>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues where 
>>>> batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>  count(1)
>>>> ----------
>>>>      3627
>>>> (1 row)
>>>> Time: 109673.202 ms (01:49.673)
>>>> 
>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where 
>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = 
>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>  count(1)
>>>> ----------
>>>>      3808
>>>> (1 row)
>>>> Time: 1608058.616 ms (26:48.059)
>>>> 
>>>> ```
>>>> 
>>>> Has anyone faced this? I'm wondering if there's some caching or 
>>>> parallelism option here that can be leveraged.  Would appreciate some 
>>>> guidance. If there isn't a straightforward fix and others feel this is an 
>>>> issue I can raise an issue and look into it further. 
>>>> 
>>>> 
>>>> Cheers,
>>>> -Gautam.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>> 
>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to