Is "id" a partition column? The inclusive manifest evaluator will use an
inclusive projection of the filter, which is true for all non-partition
column predicates.

On Thu, May 2, 2019 at 4:08 PM Anton Okolnychyi <aokolnyc...@apple.com>
wrote:

> Hm, but why don’t we filter out manifests using stats for partition
> columns? Is there a bug in InclusiveManifestEvaluator? I just tested it
> locally. InclusiveManifestEvaluator gets '(not_null(ref(name="id")) and
> ref(name="id") == 1)' as rowFilter, which is transformed into ‘true’ after
> that expression is bound. Apparently, the bound expression doesn’t allow us
> to filter any manifests.
>
>
> On 2 May 2019, at 15:33, Ryan Blue <rb...@netflix.com> wrote:
>
> Yes, the append code tries to keep the existing order. That way, if you
> have a time-based append pattern, it just works. Similarly, if you've spent
> the time to optimize the order, the append doesn't rewrite and ruin it.
>
> Sounds like you just need to sort the file list by partition to group as
> much as possible, then append. We do this when we convert tables to iceberg
> to start with a good split across manifests.
>
> On Thu, May 2, 2019 at 3:17 PM Gautam <gautamkows...@gmail.com> wrote:
>
>> Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic.
>> So it preserves the natrual order of manifests so i guess it groups based
>> on when manifests were created so the answer is whatever order the commits
>> were done. If batches within multiple days were committed out of order then
>> a manifest could end up with multiple days.
>>
>>
>> On Thu, May 2, 2019 at 2:23 PM Gautam <gautamkows...@gmail.com> wrote:
>>
>>> Ok, thanks for the tip on not having to by tied to a hierarchical
>>> partition spec.
>>>
>>> Although this still doesn't explain why all the manifests are scanned,
>>> coz it should be pruning the list of manifests and it's not. Is my
>>> understanding correct that the manifest grouping might be re-shuffling up
>>> the days so query on 1 day might map to all manifests even? Does manifest
>>> merging optimize for partition boundaries or is it based on manifest's
>>> natural order?
>>>
>>> On Thu, May 2, 2019 at 2:06 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> You also don't need to use year, month, and day. You can just use day.
>>>>
>>>> The time-based partition functions all produce ordinals, not local
>>>> values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day
>>>> and hour. In fact, I should open a PR to throw an exception when there are
>>>> duplicate partition functions...
>>>>
>>>> On Thu, May 2, 2019 at 1:52 PM Gautam <gautamkows...@gmail.com> wrote:
>>>>
>>>>> FYI .. The test Partition Spec is  :
>>>>> [
>>>>>   YEAR: identity(21)
>>>>>   MONTH: identity(22)
>>>>>   DAY: identity(23)
>>>>>   batchId: identity(24)
>>>>> ]
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkows...@gmail.com> wrote:
>>>>>
>>>>>> > Using those, you should be able to control parallelism. If you want
>>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>>>> won’t compact manifests.
>>>>>>
>>>>>> This is helpful. Thanks for the pointer on increasing parallelism.
>>>>>> Will try this out. So I understand the behaviour, if a different dataset
>>>>>> has >=5000  batches then the resultant # manifests would be
>>>>>> (total_num_batches % 5000 ) ?
>>>>>>
>>>>>> > What surprises me is that you’re not getting much benefit from
>>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>>>> it.
>>>>>>
>>>>>> Pardon the verbose example but i think it'l help explain what i'm
>>>>>> seeing ..
>>>>>>
>>>>>> Regarding manifest filtering:  I tested if partition filters in sql
>>>>>> query actually reduce manifests being inspected. In my example, i have 16
>>>>>> manifests that point to 4000 batch partitions ( each file is restricted 
>>>>>> to
>>>>>> one partition as we'r using physical partitioning in the table ).  So 
>>>>>> when
>>>>>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>>>>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among 
>>>>>> the
>>>>>> 16) , right? But i see that all 16 are being inspected in
>>>>>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>>>>>> that should be giving me the manifests that match a partition. When I
>>>>>> inspect this  it says `matchingManifests = 16` ,  which is all the
>>>>>> manifests in the table.  This *could* be due to the fact that our
>>>>>> batch ids are random UUIDs so lower/upper bounds may not help coz there's
>>>>>> no inherent ordering amongst batches.
>>>>>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>>>>>> scanned all manifests. Could this be due to the way Iceberg manifests are
>>>>>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>>>>>> boundaries and optimize for it?
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> -Gautam.
>>>>>>
>>>>>> [1] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>>>>>
>>>>>>
>>>>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> Good questions. Grouping manifests is configurable at the table
>>>>>>> level. There are 2 settings:
>>>>>>>
>>>>>>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>>>>>    target size that Iceberg will compact to
>>>>>>>    - commit.manifest.min-count-to-merge defaults to 100, this is
>>>>>>>    the minimum number of files before a compaction is triggered
>>>>>>>
>>>>>>> Using those, you should be able to control parallelism. If you want
>>>>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>>>>> won’t compact manifests.
>>>>>>>
>>>>>>> What surprises me is that you’re not getting much benefit from
>>>>>>> filtering out manifests that aren’t helpful. We see a lot of benefit 
>>>>>>> from
>>>>>>> it. You might try sorting the data files by partition before adding 
>>>>>>> them to
>>>>>>> the table. That will cluster data files in the same partition so you can
>>>>>>> read fewer manifests.
>>>>>>>
>>>>>>> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkows...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Anton,
>>>>>>>>             Sorry bout the delay on this. Been caught up with some
>>>>>>>> other things. Thanks for raising issue#173 .
>>>>>>>>
>>>>>>>> So the root cause is indeed the density and size of the schema.
>>>>>>>> While I agree the option to configure stats for columns is good 
>>>>>>>> (although
>>>>>>>> i'm not fully convinced that this is purely due to lower/upper 
>>>>>>>> bounds). For
>>>>>>>> instance, maybe it's just taking a while to iterate over manifest rows 
>>>>>>>> and
>>>>>>>> deserialize the DataFile stats in each read?  The solution i'm using 
>>>>>>>> right
>>>>>>>> now is to parallelize the manifest reading in split planning. We
>>>>>>>> regenerated the Iceberg table with more manifests. Now the code 
>>>>>>>> enables the
>>>>>>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu 
>>>>>>>> by
>>>>>>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>>>>>>> manifests.
>>>>>>>>
>>>>>>>> On that note, the ability to parallelize is limited to how many
>>>>>>>> manifests are in the table. So as a test, for a table with 4000 files 
>>>>>>>> we
>>>>>>>> created one manifest per file (think of one file as a single batch 
>>>>>>>> commit
>>>>>>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>>>>>>> Iceberg summarizes manifests into fewer manifests with each commit so 
>>>>>>>> we
>>>>>>>> instead ended up with 16 manifests. So now split planning is limited to
>>>>>>>> reading at most 16 units of parallelism. Is this grouping of manifests 
>>>>>>>> into
>>>>>>>> fewer configurable? if not should we allow making this configurable?
>>>>>>>>
>>>>>>>> Sorry if this is forking a different conversation. If so, I can
>>>>>>>> start a separate conversation thread on this.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <
>>>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>>>
>>>>>>>>> Hey Gautam,
>>>>>>>>>
>>>>>>>>> Out of my curiosity, did you manage to confirm the root cause of
>>>>>>>>> the issue?
>>>>>>>>>
>>>>>>>>> P.S. I created [1] so that we can make collection of lower/upper
>>>>>>>>> bounds configurable.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Anton
>>>>>>>>>
>>>>>>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>>>>>>
>>>>>>>>> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Thanks guys for the insights ..
>>>>>>>>>
>>>>>>>>> > I like Anton's idea to have an optional list of columns for
>>>>>>>>> which we keep stats. That would allow us to avoid storing stats for
>>>>>>>>> thousands of columns that won't ever be used. Another option here is 
>>>>>>>>> to add
>>>>>>>>> a flag to keep stats only for top-level columns. That's much less
>>>>>>>>> configuration for users and probably does the right thing in many 
>>>>>>>>> cases.
>>>>>>>>> Simpler to use but not as fast in all cases is sometimes a good 
>>>>>>>>> compromise.
>>>>>>>>>
>>>>>>>>> This makes sense to me. It adds a variable that data pipelines can
>>>>>>>>> tweak on to improve performance. I will add an issue on Github to add 
>>>>>>>>> a
>>>>>>>>> stats config/flag. Although, having said that, I would try to optimize
>>>>>>>>> around this coz read patterns are hardly ever known a priori and 
>>>>>>>>> adding a
>>>>>>>>> column to this list means having to re-write the entire data again. 
>>>>>>>>> So i'l
>>>>>>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>>>>>>
>>>>>>>>> >  To clarify my comment on changing the storage: the idea is to
>>>>>>>>> use separate columns instead of a map and then use a columnar storage
>>>>>>>>> format so we can project those columns independently. Avro can't 
>>>>>>>>> project
>>>>>>>>> columns independently. This wouldn't help on the write side and may 
>>>>>>>>> just
>>>>>>>>> cause a lot of seeking on the read side that diminishes the benefits.
>>>>>>>>>
>>>>>>>>> Gotcha.
>>>>>>>>>
>>>>>>>>> > Also, now that we have more details, I think there is a second
>>>>>>>>> problem. Because we expect several manifests in a table, we 
>>>>>>>>> parallelize
>>>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>>>> planning operation is happening in a single thread instead of in 
>>>>>>>>> parallel.
>>>>>>>>> I think if you split the write across several manifests, you'd 
>>>>>>>>> improve wall
>>>>>>>>> time.
>>>>>>>>>
>>>>>>>>> This might actually be the issue here, this was a test bench
>>>>>>>>> dataset so the writer job created a single manifest for all the data 
>>>>>>>>> in the
>>>>>>>>> dataset which isn't really how we will do things in prod. I'l try and
>>>>>>>>> create the metadata based on productions expected commit pattern.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regarding Iceberg not truncating large bounded column values
>>>>>>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I
>>>>>>>>> didn't consider this with our dataset. The current evidence is leading
>>>>>>>>> towards the number of columns and the sheer number of files that the
>>>>>>>>> manifest is maintaining but this is a good thing to look into.
>>>>>>>>>
>>>>>>>>> Thanks again guys.
>>>>>>>>>
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I like Anton's idea to have an optional list of columns for which
>>>>>>>>>> we keep stats. That would allow us to avoid storing stats for 
>>>>>>>>>> thousands of
>>>>>>>>>> columns that won't ever be used. Another option here is to add a 
>>>>>>>>>> flag to
>>>>>>>>>> keep stats only for top-level columns. That's much less 
>>>>>>>>>> configuration for
>>>>>>>>>> users and probably does the right thing in many cases. Simpler to 
>>>>>>>>>> use but
>>>>>>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>>>>>>
>>>>>>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>>>>>>> separate columns instead of a map and then use a columnar storage 
>>>>>>>>>> format so
>>>>>>>>>> we can project those columns independently. Avro can't project 
>>>>>>>>>> columns
>>>>>>>>>> independently. This wouldn't help on the write side and may just 
>>>>>>>>>> cause a
>>>>>>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>>>>>>
>>>>>>>>>> Also, now that we have more details, I think there is a second
>>>>>>>>>> problem. Because we expect several manifests in a table, we 
>>>>>>>>>> parallelize
>>>>>>>>>> split planning on manifests instead of splits of manifest files. This
>>>>>>>>>> planning operation is happening in a single thread instead of in 
>>>>>>>>>> parallel.
>>>>>>>>>> I think if you split the write across several manifests, you'd 
>>>>>>>>>> improve wall
>>>>>>>>>> time.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> No, we haven’t experienced it yet. The manifest size is huge in
>>>>>>>>>>> your case. To me, Ryan is correct: it might be either big 
>>>>>>>>>>> lower/upper
>>>>>>>>>>> bounds (then truncation will help) or a big number columns (then 
>>>>>>>>>>> collecting
>>>>>>>>>>> lower/upper bounds only for specific columns will help). I think 
>>>>>>>>>>> both
>>>>>>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>>>>>>
>>>>>>>>>>> Since you mentioned you have a lot of columns and we collect
>>>>>>>>>>> bounds for nested struct fields, I am wondering if you could revert 
>>>>>>>>>>> [1]
>>>>>>>>>>> locally and compare the manifest size.
>>>>>>>>>>>
>>>>>>>>>>> [1] -
>>>>>>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>>>>>>
>>>>>>>>>>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for responding Anton! Do we think the delay is mainly due
>>>>>>>>>>> to lower/upper bound filtering? have you faced this? I haven't 
>>>>>>>>>>> exactly
>>>>>>>>>>> found where the slowness is yet. It's generally due to the stats 
>>>>>>>>>>> filtering
>>>>>>>>>>> but what part of it is causing this much network traffic. There's
>>>>>>>>>>> CloseableIteratable  that takes a ton of time on the next() and 
>>>>>>>>>>> hasNext()
>>>>>>>>>>> calls. My guess is the expression evaluation on each manifest entry 
>>>>>>>>>>> is
>>>>>>>>>>> what's doing it.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>>>>>>> collect stats and that should be configurable by the user. Maybe, 
>>>>>>>>>>>> this
>>>>>>>>>>>> config should be applicable only to lower/upper bounds. As we now 
>>>>>>>>>>>> collect
>>>>>>>>>>>> stats even for nested struct fields, this might generate a lot of 
>>>>>>>>>>>> data. In
>>>>>>>>>>>> most cases, users cluster/sort their data by a subset of data 
>>>>>>>>>>>> columns to
>>>>>>>>>>>> have fast queries with predicates on those columns. So, being able 
>>>>>>>>>>>> to
>>>>>>>>>>>> configure columns for which to collect lower/upper bounds seems 
>>>>>>>>>>>> reasonable.
>>>>>>>>>>>>
>>>>>>>>>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> >  The length in bytes of the schema is 109M as compared to
>>>>>>>>>>>> 687K of the non-stats dataset.
>>>>>>>>>>>>
>>>>>>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <
>>>>>>>>>>>> gautamkows...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Correction, partition count = 4308.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Re: Changing the way we keep stats. Avro is a block
>>>>>>>>>>>>> splittable format and is friendly with parallel compute 
>>>>>>>>>>>>> frameworks like
>>>>>>>>>>>>> Spark.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here I am trying to say that we don't need to change the
>>>>>>>>>>>>> format to columnar right? The current format is already friendly 
>>>>>>>>>>>>> for
>>>>>>>>>>>>> parallelization.
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <
>>>>>>>>>>>>> gautamkows...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are
>>>>>>>>>>>>>> some details on the dataset with stats :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>>>>>>  Snapshot Summary :{added-data-files=4308,
>>>>>>>>>>>>>> added-records=11494037, changed-partition-count=4308,
>>>>>>>>>>>>>> total-records=11494037, total-data-files=4308}
>>>>>>>>>>>>>>  Manifest files :1
>>>>>>>>>>>>>>  Manifest details:
>>>>>>>>>>>>>>      => manifest file path:
>>>>>>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>>>>>>      => existing files count: 0
>>>>>>>>>>>>>>      => added files count: 4308
>>>>>>>>>>>>>>      => deleted files count: 0
>>>>>>>>>>>>>>      => partitions count: 4
>>>>>>>>>>>>>>      => partition fields count: 4
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Num data files. It has a single manifest keep track of
>>>>>>>>>>>>>> 4308 files. Total record count is 11.4 Million.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are 
>>>>>>>>>>>>>> in order
>>>>>>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) 
>>>>>>>>>>>>>> and has
>>>>>>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for
>>>>>>>>>>>>>> native, struct types (not yet for map KVs and arrays).  The 
>>>>>>>>>>>>>> length in bytes
>>>>>>>>>>>>>> of the schema is 109M as compared to 687K of the non-stats 
>>>>>>>>>>>>>> dataset.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for
>>>>>>>>>>>>>> our datasets with much larger number of data files we want to 
>>>>>>>>>>>>>> leverage
>>>>>>>>>>>>>> iceberg's ability to skip entire files based on these stats. 
>>>>>>>>>>>>>> This is one of
>>>>>>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re: Changing the way we keep stats. Avro is a block
>>>>>>>>>>>>>> splittable format and is friendly with parallel compute 
>>>>>>>>>>>>>> frameworks like
>>>>>>>>>>>>>> Spark. So would it make sense for instance to have add an option 
>>>>>>>>>>>>>> to have
>>>>>>>>>>>>>> Spark job / Futures  handle split planning?   In a larger 
>>>>>>>>>>>>>> context, 109M is
>>>>>>>>>>>>>> not that much metadata given that Iceberg is meant for datasets 
>>>>>>>>>>>>>> where the
>>>>>>>>>>>>>> metadata itself is Bigdata scale.  I'm curious on how folks with 
>>>>>>>>>>>>>> larger
>>>>>>>>>>>>>> sized metadata (in GB) are optimizing this today.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for bringing this up! My initial theory is that this
>>>>>>>>>>>>>>> table has a ton of stats data that you have to read. That could 
>>>>>>>>>>>>>>> happen in a
>>>>>>>>>>>>>>> couple of cases.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> First, you might have large values in some columns. Parquet
>>>>>>>>>>>>>>> will suppress its stats if values are larger than 4k and those 
>>>>>>>>>>>>>>> are what
>>>>>>>>>>>>>>> Iceberg uses. But that could still cause you to store two 1k+ 
>>>>>>>>>>>>>>> objects for
>>>>>>>>>>>>>>> each large column (lower and upper bounds). With a lot of data 
>>>>>>>>>>>>>>> files, that
>>>>>>>>>>>>>>> could add up quickly. The solution here is to implement #113
>>>>>>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so
>>>>>>>>>>>>>>> that we don't store the actual min and max for string or binary 
>>>>>>>>>>>>>>> columns,
>>>>>>>>>>>>>>> but instead a truncated value that is just above or just below.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The second case is when you have a lot of columns. Each
>>>>>>>>>>>>>>> column stores both a lower and upper bound, so 1,000 columns 
>>>>>>>>>>>>>>> could easily
>>>>>>>>>>>>>>> take 8k per file. If this is the problem, then maybe we want to 
>>>>>>>>>>>>>>> have a way
>>>>>>>>>>>>>>> to turn off column stats. We could also think of ways to change 
>>>>>>>>>>>>>>> the way
>>>>>>>>>>>>>>> stats are stored in the manifest files, but that only helps if 
>>>>>>>>>>>>>>> we move to a
>>>>>>>>>>>>>>> columnar format to store manifests, so this is probably not a 
>>>>>>>>>>>>>>> short-term
>>>>>>>>>>>>>>> fix.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you can share a bit more information about this table, we
>>>>>>>>>>>>>>> can probably tell which one is the problem. I'm guessing it is 
>>>>>>>>>>>>>>> the large
>>>>>>>>>>>>>>> values problem.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <
>>>>>>>>>>>>>>> gautamkows...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have been testing Iceberg reading with and without stats
>>>>>>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a 
>>>>>>>>>>>>>>>> huge jump in
>>>>>>>>>>>>>>>> network traffic with the latter..
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both
>>>>>>>>>>>>>>>> written in Iceberg format. One with and the other without 
>>>>>>>>>>>>>>>> stats collected
>>>>>>>>>>>>>>>> in Iceberg manifests. In particular the difference between the 
>>>>>>>>>>>>>>>> writers used
>>>>>>>>>>>>>>>> for the two datasets is this PR:
>>>>>>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured 
>>>>>>>>>>>>>>>> tcpdump from
>>>>>>>>>>>>>>>> query scans run on these two datasets.  The partition being 
>>>>>>>>>>>>>>>> scanned
>>>>>>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in 
>>>>>>>>>>>>>>>> both datasets.
>>>>>>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem 
>>>>>>>>>>>>>>>> (ADLS) when
>>>>>>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the 
>>>>>>>>>>>>>>>> same Iceberg
>>>>>>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc
>>>>>>>>>>>>>>>> -l
>>>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type 
>>>>>>>>>>>>>>>> EN10MB
>>>>>>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *8844*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc
>>>>>>>>>>>>>>>> -l
>>>>>>>>>>>>>>>> reading from file
>>>>>>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap, link-type EN10MB 
>>>>>>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *269708*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As a consequence of this the query response times get
>>>>>>>>>>>>>>>> affected drastically (illustrated below). I must confess that 
>>>>>>>>>>>>>>>> I am on a
>>>>>>>>>>>>>>>> slow internet connection via VPN connecting to the remote FS. 
>>>>>>>>>>>>>>>> But the
>>>>>>>>>>>>>>>> dataset without stats took just 1m 49s while the dataset with 
>>>>>>>>>>>>>>>> stats took
>>>>>>>>>>>>>>>> 26m 48s to read the same sized data. Most of that time in the 
>>>>>>>>>>>>>>>> latter
>>>>>>>>>>>>>>>> dataset was spent split planning in Manifest reading and stats 
>>>>>>>>>>>>>>>> evaluation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>> all=> select count(*)  from
>>>>>>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues where batchId =
>>>>>>>>>>>>>>>> '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>>>      3627
>>>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11
>>>>>>>>>>>>>>>> where _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and 
>>>>>>>>>>>>>>>> batchId =
>>>>>>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>>>>>>  count(1)
>>>>>>>>>>>>>>>> ----------
>>>>>>>>>>>>>>>>      3808
>>>>>>>>>>>>>>>> (1 row)
>>>>>>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some
>>>>>>>>>>>>>>>> caching or parallelism option here that can be leveraged.  
>>>>>>>>>>>>>>>> Would appreciate
>>>>>>>>>>>>>>>> some guidance. If there isn't a straightforward fix and others 
>>>>>>>>>>>>>>>> feel this is
>>>>>>>>>>>>>>>> an issue I can raise an issue and look into it further.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>> Netflix
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Ryan Blue
>>>>>>>>>> Software Engineer
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to