> Using those, you should be able to control parallelism. If you want to
test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
compact manifests.

This is helpful. Thanks for the pointer on increasing parallelism. Will try
this out. So I understand the behaviour, if a different dataset has >=5000
batches then the resultant # manifests would be (total_num_batches % 5000 )
?

> What surprises me is that you’re not getting much benefit from filtering
out manifests that aren’t helpful. We see a lot of benefit from it.

Pardon the verbose example but i think it'l help explain what i'm seeing ..

Regarding manifest filtering:  I tested if partition filters in sql query
actually reduce manifests being inspected. In my example, i have 16
manifests that point to 4000 batch partitions ( each file is restricted to
one partition as we'r using physical partitioning in the table ).  So when
querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
16) , right? But i see that all 16 are being inspected in
BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
that should be giving me the manifests that match a partition. When I
inspect this  it says `matchingManifests = 16` ,  which is all the
manifests in the table.  This *could* be due to the fact that our batch ids
are random UUIDs so lower/upper bounds may not help coz there's no inherent
ordering amongst batches.
But then  i tried year = 2019 and month = 01 and day = 01 which also
scanned all manifests. Could this be due to the way Iceberg manifests are
re-grouped and merged? If so, shouldn't re-grouping honour partition
boundaries and optimize for it?


Cheers,
-Gautam.

[1] -
https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173


On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:

> Good questions. Grouping manifests is configurable at the table level.
> There are 2 settings:
>
>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>    target size that Iceberg will compact to
>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>    minimum number of files before a compaction is triggered
>
> Using those, you should be able to control parallelism. If you want to
> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
> compact manifests.
>
> What surprises me is that you’re not getting much benefit from filtering
> out manifests that aren’t helpful. We see a lot of benefit from it. You
> might try sorting the data files by partition before adding them to the
> table. That will cluster data files in the same partition so you can read
> fewer manifests.
>
> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkows...@gmail.com> wrote:
>
>> Hey Anton,
>>             Sorry bout the delay on this. Been caught up with some other
>> things. Thanks for raising issue#173 .
>>
>> So the root cause is indeed the density and size of the schema. While I
>> agree the option to configure stats for columns is good (although i'm not
>> fully convinced that this is purely due to lower/upper bounds). For
>> instance, maybe it's just taking a while to iterate over manifest rows and
>> deserialize the DataFile stats in each read?  The solution i'm using right
>> now is to parallelize the manifest reading in split planning. We
>> regenerated the Iceberg table with more manifests. Now the code enables the
>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>> default, configurable using 'iceberg.worker.num-threads' ) to read
>> manifests.
>>
>> On that note, the ability to parallelize is limited to how many manifests
>> are in the table. So as a test, for a table with 4000 files we created one
>> manifest per file (think of one file as a single batch commit in this
>> case). So I was hoping to get a parallelism factor of 4000. But Iceberg
>> summarizes manifests into fewer manifests with each commit so we instead
>> ended up with 16 manifests. So now split planning is limited to reading at
>> most 16 units of parallelism. Is this grouping of manifests into fewer
>> configurable? if not should we allow making this configurable?
>>
>> Sorry if this is forking a different conversation. If so, I can start a
>> separate conversation thread on this.
>>
>>
>>
>>
>>
>>
>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnyc...@apple.com>
>> wrote:
>>
>>> Hey Gautam,
>>>
>>> Out of my curiosity, did you manage to confirm the root cause of the
>>> issue?
>>>
>>> P.S. I created [1] so that we can make collection of lower/upper bounds
>>> configurable.
>>>
>>> Thanks,
>>> Anton
>>>
>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>
>>> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com> wrote:
>>>
>>> Thanks guys for the insights ..
>>>
>>> > I like Anton's idea to have an optional list of columns for which we
>>> keep stats. That would allow us to avoid storing stats for thousands of
>>> columns that won't ever be used. Another option here is to add a flag to
>>> keep stats only for top-level columns. That's much less configuration for
>>> users and probably does the right thing in many cases. Simpler to use but
>>> not as fast in all cases is sometimes a good compromise.
>>>
>>> This makes sense to me. It adds a variable that data pipelines can tweak
>>> on to improve performance. I will add an issue on Github to add a stats
>>> config/flag. Although, having said that, I would try to optimize around
>>> this coz read patterns are hardly ever known a priori and adding a column
>>> to this list means having to re-write the entire data again. So i'l try the
>>> other suggestion which is parallelizing on multiple manifests.
>>>
>>> >  To clarify my comment on changing the storage: the idea is to use
>>> separate columns instead of a map and then use a columnar storage format so
>>> we can project those columns independently. Avro can't project columns
>>> independently. This wouldn't help on the write side and may just cause a
>>> lot of seeking on the read side that diminishes the benefits.
>>>
>>> Gotcha.
>>>
>>> > Also, now that we have more details, I think there is a second
>>> problem. Because we expect several manifests in a table, we parallelize
>>> split planning on manifests instead of splits of manifest files. This
>>> planning operation is happening in a single thread instead of in parallel.
>>> I think if you split the write across several manifests, you'd improve wall
>>> time.
>>>
>>> This might actually be the issue here, this was a test bench dataset so
>>> the writer job created a single manifest for all the data in the dataset
>>> which isn't really how we will do things in prod. I'l try and create the
>>> metadata based on productions expected commit pattern.
>>>
>>>
>>> Regarding Iceberg not truncating large bounded column values
>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>> consider this with our dataset. The current evidence is leading towards the
>>> number of columns and the sheer number of files that the manifest is
>>> maintaining but this is a good thing to look into.
>>>
>>> Thanks again guys.
>>>
>>> -Gautam.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> I like Anton's idea to have an optional list of columns for which we
>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>> columns that won't ever be used. Another option here is to add a flag to
>>>> keep stats only for top-level columns. That's much less configuration for
>>>> users and probably does the right thing in many cases. Simpler to use but
>>>> not as fast in all cases is sometimes a good compromise.
>>>>
>>>> To clarify my comment on changing the storage: the idea is to use
>>>> separate columns instead of a map and then use a columnar storage format so
>>>> we can project those columns independently. Avro can't project columns
>>>> independently. This wouldn't help on the write side and may just cause a
>>>> lot of seeking on the read side that diminishes the benefits.
>>>>
>>>> Also, now that we have more details, I think there is a second problem.
>>>> Because we expect several manifests in a table, we parallelize split
>>>> planning on manifests instead of splits of manifest files. This planning
>>>> operation is happening in a single thread instead of in parallel. I think
>>>> if you split the write across several manifests, you'd improve wall time.
>>>>
>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnyc...@apple.com>
>>>> wrote:
>>>>
>>>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>>>> (then truncation will help) or a big number columns (then collecting
>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>> optimizations are needed and will reduce the manifest size.
>>>>>
>>>>> Since you mentioned you have a lot of columns and we collect bounds
>>>>> for nested struct fields, I am wondering if you could revert [1] locally
>>>>> and compare the manifest size.
>>>>>
>>>>> [1] -
>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>
>>>>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com> wrote:
>>>>>
>>>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>>>> where the slowness is yet. It's generally due to the stats filtering but
>>>>> what part of it is causing this much network traffic. There's
>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>> what's doing it.
>>>>>
>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>> aokolnyc...@apple.com> wrote:
>>>>>
>>>>>> I think we need to have a list of columns for which we want to
>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>> config should be applicable only to lower/upper bounds. As we now collect
>>>>>> stats even for nested struct fields, this might generate a lot of data. 
>>>>>> In
>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>> configure columns for which to collect lower/upper bounds seems 
>>>>>> reasonable.
>>>>>>
>>>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com> wrote:
>>>>>>
>>>>>> >  The length in bytes of the schema is 109M as compared to 687K of
>>>>>> the non-stats dataset.
>>>>>>
>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkows...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Correction, partition count = 4308.
>>>>>>>
>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>>
>>>>>>> Here I am trying to say that we don't need to change the format to
>>>>>>> columnar right? The current format is already friendly for 
>>>>>>> parallelization.
>>>>>>>
>>>>>>> thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkows...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>>>>>>> details on the dataset with stats :
>>>>>>>>
>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>  Spark Schema fields : 20
>>>>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>>>>>>> changed-partition-count=4308, total-records=11494037, 
>>>>>>>> total-data-files=4308}
>>>>>>>>  Manifest files :1
>>>>>>>>  Manifest details:
>>>>>>>>      => manifest file path:
>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>      => existing files count: 0
>>>>>>>>      => added files count: 4308
>>>>>>>>      => deleted files count: 0
>>>>>>>>      => partitions count: 4
>>>>>>>>      => partition fields count: 4
>>>>>>>>
>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>>
>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in 
>>>>>>>> order
>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and 
>>>>>>>> has
>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for native,
>>>>>>>> struct types (not yet for map KVs and arrays).  The length in bytes of 
>>>>>>>> the
>>>>>>>> schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>
>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>>>> datasets with much larger number of data files we want to leverage
>>>>>>>> iceberg's ability to skip entire files based on these stats. This is 
>>>>>>>> one of
>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>
>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>> would it make sense for instance to have add an option to have Spark 
>>>>>>>> job /
>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not that
>>>>>>>> much metadata given that Iceberg is meant for datasets where the 
>>>>>>>> metadata
>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>
>>>>>>>>> Thanks for bringing this up! My initial theory is that this table
>>>>>>>>> has a ton of stats data that you have to read. That could happen in a
>>>>>>>>> couple of cases.
>>>>>>>>>
>>>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>>>> suppress its stats if values are larger than 4k and those are what 
>>>>>>>>> Iceberg
>>>>>>>>> uses. But that could still cause you to store two 1k+ objects for each
>>>>>>>>> large column (lower and upper bounds). With a lot of data files, that 
>>>>>>>>> could
>>>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that
>>>>>>>>> we don't store the actual min and max for string or binary columns, 
>>>>>>>>> but
>>>>>>>>> instead a truncated value that is just above or just below.
>>>>>>>>>
>>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily 
>>>>>>>>> take 8k
>>>>>>>>> per file. If this is the problem, then maybe we want to have a way to 
>>>>>>>>> turn
>>>>>>>>> off column stats. We could also think of ways to change the way stats 
>>>>>>>>> are
>>>>>>>>> stored in the manifest files, but that only helps if we move to a 
>>>>>>>>> columnar
>>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>>
>>>>>>>>> If you can share a bit more information about this table, we can
>>>>>>>>> probably tell which one is the problem. I'm guessing it is the large 
>>>>>>>>> values
>>>>>>>>> problem.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkows...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello folks,
>>>>>>>>>>
>>>>>>>>>> I have been testing Iceberg reading with and without stats built
>>>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in 
>>>>>>>>>> network
>>>>>>>>>> traffic with the latter..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>>>> Iceberg format. One with and the other without stats collected in 
>>>>>>>>>> Iceberg
>>>>>>>>>> manifests. In particular the difference between the writers used for 
>>>>>>>>>> the
>>>>>>>>>> two datasets is this PR:
>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump 
>>>>>>>>>> from
>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both 
>>>>>>>>>> datasets.
>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem 
>>>>>>>>>> (ADLS) when
>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same 
>>>>>>>>>> Iceberg
>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>> reading from file
>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>>>> (Ethernet)
>>>>>>>>>>
>>>>>>>>>> *8844*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>>>
>>>>>>>>>> *269708*
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the 
>>>>>>>>>> dataset
>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 
>>>>>>>>>> 48s to
>>>>>>>>>> read the same sized data. Most of that time in the latter dataset 
>>>>>>>>>> was spent
>>>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>  count(1)
>>>>>>>>>> ----------
>>>>>>>>>>      3627
>>>>>>>>>> (1 row)
>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>
>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>  count(1)
>>>>>>>>>> ----------
>>>>>>>>>>      3808
>>>>>>>>>> (1 row)
>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>>>> parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this 
>>>>>>>>>> is an
>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> -Gautam.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to