Correction, partition count = 4308. > Re: Changing the way we keep stats. Avro is a block splittable format and is friendly with parallel compute frameworks like Spark.
Here I am trying to say that we don't need to change the format to columnar right? The current format is already friendly for parallelization. thanks. On Fri, Apr 19, 2019 at 12:12 PM Gautam <[email protected]> wrote: > Ah, my bad. I missed adding in the schema details .. Here are some details > on the dataset with stats : > > Iceberg Schema Columns : 20 > Spark Schema fields : 20 > Snapshot Summary :{added-data-files=4308, added-records=11494037, > changed-partition-count=4308, total-records=11494037, total-data-files=4308} > Manifest files :1 > Manifest details: > => manifest file path: > adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro > => manifest file length: 109,028,885 > => existing files count: 0 > => added files count: 4308 > => deleted files count: 0 > => partitions count: 4 > => partition fields count: 4 > > Re: Num data files. It has a single manifest keep track of 4308 files. > Total record count is 11.4 Million. > > Re: Columns. You are right that this table has many columns.. although it > has only 20 top-level columns, num leaf columns are in order of thousands. > This Schema is heavy on structs (in the thousands) and has deep levels of > nesting. I know Iceberg keeps > *column_sizes, value_counts, null_value_counts* for all leaf fields and > additionally *lower-bounds, upper-bounds* for native, struct types (not > yet for map KVs and arrays). The length in bytes of the schema is 109M as > compared to 687K of the non-stats dataset. > > Re: Turning off stats. I am looking to leverage stats coz for our datasets > with much larger number of data files we want to leverage iceberg's ability > to skip entire files based on these stats. This is one of the big > incentives for us to use Iceberg. > > Re: Changing the way we keep stats. Avro is a block splittable format and > is friendly with parallel compute frameworks like Spark. So would it make > sense for instance to have add an option to have Spark job / Futures > handle split planning? In a larger context, 109M is not that much > metadata given that Iceberg is meant for datasets where the metadata itself > is Bigdata scale. I'm curious on how folks with larger sized metadata (in > GB) are optimizing this today. > > > Cheers, > -Gautam. > > > > > On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <[email protected]> > wrote: > >> Thanks for bringing this up! My initial theory is that this table has a >> ton of stats data that you have to read. That could happen in a couple of >> cases. >> >> First, you might have large values in some columns. Parquet will suppress >> its stats if values are larger than 4k and those are what Iceberg uses. But >> that could still cause you to store two 1k+ objects for each large column >> (lower and upper bounds). With a lot of data files, that could add up >> quickly. The solution here is to implement #113 >> <https://github.com/apache/incubator-iceberg/issues/113> so that we >> don't store the actual min and max for string or binary columns, but >> instead a truncated value that is just above or just below. >> >> The second case is when you have a lot of columns. Each column stores >> both a lower and upper bound, so 1,000 columns could easily take 8k per >> file. If this is the problem, then maybe we want to have a way to turn off >> column stats. We could also think of ways to change the way stats are >> stored in the manifest files, but that only helps if we move to a columnar >> format to store manifests, so this is probably not a short-term fix. >> >> If you can share a bit more information about this table, we can probably >> tell which one is the problem. I'm guessing it is the large values problem. >> >> On Thu, Apr 18, 2019 at 11:52 AM Gautam <[email protected]> wrote: >> >>> Hello folks, >>> >>> I have been testing Iceberg reading with and without stats built into >>> Iceberg dataset manifest and found that there's a huge jump in network >>> traffic with the latter.. >>> >>> >>> In my test I am comparing two Iceberg datasets, both written in Iceberg >>> format. One with and the other without stats collected in Iceberg >>> manifests. In particular the difference between the writers used for the >>> two datasets is this PR: >>> https://github.com/apache/incubator-iceberg/pull/63/files which uses >>> Iceberg's writers for writing Parquet data. I captured tcpdump from query >>> scans run on these two datasets. The partition being scanned contains 1 >>> manifest, 1 parquet data file and ~3700 rows in both datasets. There's a >>> 30x jump in network traffic to the remote filesystem (ADLS) when i switch >>> to stats based Iceberg dataset. Both queries used the same Iceberg reader >>> code to access both datasets. >>> >>> ``` >>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep >>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l >>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, >>> link-type EN10MB (Ethernet) >>> >>> *8844* >>> >>> >>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep >>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l >>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, >>> link-type EN10MB (Ethernet) >>> >>> *269708* >>> >>> ``` >>> >>> As a consequence of this the query response times get affected >>> drastically (illustrated below). I must confess that I am on a slow >>> internet connection via VPN connecting to the remote FS. But the dataset >>> without stats took just 1m 49s while the dataset with stats took 26m 48s to >>> read the same sized data. Most of that time in the latter dataset was spent >>> split planning in Manifest reading and stats evaluation. >>> >>> ``` >>> all=> select count(*) from iceberg_geo1_metrixx_qc_postvalues where >>> batchId = '4a6f95abac924159bb3d7075373395c9'; >>> count(1) >>> ---------- >>> 3627 >>> (1 row) >>> *Time: 109673.202 ms (01:49.673)* >>> >>> all=> select count(*) from iceberg_scratch_pad_demo_11 where >>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = >>> '6d50eeb3e7d74b4f99eea91a27fc8f15'; >>> count(1) >>> ---------- >>> 3808 >>> (1 row) >>> *Time: 1608058.616 ms (26:48.059)* >>> >>> ``` >>> >>> Has anyone faced this? I'm wondering if there's some caching or >>> parallelism option here that can be leveraged. Would appreciate some >>> guidance. If there isn't a straightforward fix and others feel this is an >>> issue I can raise an issue and look into it further. >>> >>> >>> Cheers, >>> -Gautam. >>> >>> >>> >>> >>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> >
