Correction, partition count = 4308.

> Re: Changing the way we keep stats. Avro is a block splittable format and
is friendly with parallel compute frameworks like Spark.

Here I am trying to say that we don't need to change the format to columnar
right? The current format is already friendly for parallelization.

thanks.





On Fri, Apr 19, 2019 at 12:12 PM Gautam <[email protected]> wrote:

> Ah, my bad. I missed adding in the schema details .. Here are some details
> on the dataset with stats :
>
>  Iceberg Schema Columns : 20
>  Spark Schema fields : 20
>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>  Manifest files :1
>  Manifest details:
>      => manifest file path:
> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>      => manifest file length: 109,028,885
>      => existing files count: 0
>      => added files count: 4308
>      => deleted files count: 0
>      => partitions count: 4
>      => partition fields count: 4
>
> Re: Num data files. It has a single manifest keep track of 4308 files.
> Total record count is 11.4 Million.
>
> Re: Columns. You are right that this table has many columns.. although it
> has only 20 top-level columns,  num leaf columns are in order of thousands.
> This Schema is heavy on structs (in the thousands) and has deep levels of
> nesting.  I know Iceberg keeps
> *column_sizes, value_counts, null_value_counts* for all leaf fields and
> additionally *lower-bounds, upper-bounds* for native, struct types (not
> yet for map KVs and arrays).  The length in bytes of the schema is 109M as
> compared to 687K of the non-stats dataset.
>
> Re: Turning off stats. I am looking to leverage stats coz for our datasets
> with much larger number of data files we want to leverage iceberg's ability
> to skip entire files based on these stats. This is one of the big
> incentives for us to use Iceberg.
>
> Re: Changing the way we keep stats. Avro is a block splittable format and
> is friendly with parallel compute frameworks like Spark. So would it make
> sense for instance to have add an option to have Spark job / Futures
> handle split planning?   In a larger context, 109M is not that much
> metadata given that Iceberg is meant for datasets where the metadata itself
> is Bigdata scale.  I'm curious on how folks with larger sized metadata (in
> GB) are optimizing this today.
>
>
> Cheers,
> -Gautam.
>
>
>
>
> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <[email protected]>
> wrote:
>
>> Thanks for bringing this up! My initial theory is that this table has a
>> ton of stats data that you have to read. That could happen in a couple of
>> cases.
>>
>> First, you might have large values in some columns. Parquet will suppress
>> its stats if values are larger than 4k and those are what Iceberg uses. But
>> that could still cause you to store two 1k+ objects for each large column
>> (lower and upper bounds). With a lot of data files, that could add up
>> quickly. The solution here is to implement #113
>> <https://github.com/apache/incubator-iceberg/issues/113> so that we
>> don't store the actual min and max for string or binary columns, but
>> instead a truncated value that is just above or just below.
>>
>> The second case is when you have a lot of columns. Each column stores
>> both a lower and upper bound, so 1,000 columns could easily take 8k per
>> file. If this is the problem, then maybe we want to have a way to turn off
>> column stats. We could also think of ways to change the way stats are
>> stored in the manifest files, but that only helps if we move to a columnar
>> format to store manifests, so this is probably not a short-term fix.
>>
>> If you can share a bit more information about this table, we can probably
>> tell which one is the problem. I'm guessing it is the large values problem.
>>
>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <[email protected]> wrote:
>>
>>> Hello folks,
>>>
>>> I have been testing Iceberg reading with and without stats built into
>>> Iceberg dataset manifest and found that there's a huge jump in network
>>> traffic with the latter..
>>>
>>>
>>> In my test I am comparing two Iceberg datasets, both written in Iceberg
>>> format. One with and the other without stats collected in Iceberg
>>> manifests. In particular the difference between the writers used for the
>>> two datasets is this PR:
>>> https://github.com/apache/incubator-iceberg/pull/63/files which uses
>>> Iceberg's writers for writing Parquet data. I captured tcpdump from query
>>> scans run on these two datasets.  The partition being scanned contains 1
>>> manifest, 1 parquet data file and ~3700 rows in both datasets. There's a
>>> 30x jump in network traffic to the remote filesystem (ADLS) when i switch
>>> to stats based Iceberg dataset. Both queries used the same Iceberg reader
>>> code to access both datasets.
>>>
>>> ```
>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap,
>>> link-type EN10MB (Ethernet)
>>>
>>> *8844*
>>>
>>>
>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>> link-type EN10MB (Ethernet)
>>>
>>> *269708*
>>>
>>> ```
>>>
>>> As a consequence of this the query response times get affected
>>> drastically (illustrated below). I must confess that I am on a slow
>>> internet connection via VPN connecting to the remote FS. But the dataset
>>> without stats took just 1m 49s while the dataset with stats took 26m 48s to
>>> read the same sized data. Most of that time in the latter dataset was spent
>>> split planning in Manifest reading and stats evaluation.
>>>
>>> ```
>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues where
>>> batchId = '4a6f95abac924159bb3d7075373395c9';
>>>  count(1)
>>> ----------
>>>      3627
>>> (1 row)
>>> *Time: 109673.202 ms (01:49.673)*
>>>
>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>  count(1)
>>> ----------
>>>      3808
>>> (1 row)
>>> *Time: 1608058.616 ms (26:48.059)*
>>>
>>> ```
>>>
>>> Has anyone faced this? I'm wondering if there's some caching or
>>> parallelism option here that can be leveraged.  Would appreciate some
>>> guidance. If there isn't a straightforward fix and others feel this is an
>>> issue I can raise an issue and look into it further.
>>>
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Reply via email to