Hey Anton,
            Sorry bout the delay on this. Been caught up with some other
things. Thanks for raising issue#173 .

So the root cause is indeed the density and size of the schema. While I
agree the option to configure stats for columns is good (although i'm not
fully convinced that this is purely due to lower/upper bounds). For
instance, maybe it's just taking a while to iterate over manifest rows and
deserialize the DataFile stats in each read?  The solution i'm using right
now is to parallelize the manifest reading in split planning. We
regenerated the Iceberg table with more manifests. Now the code enables the
ParallelIterator which uses a worker pool of threads (1 thread per cpu by
default, configurable using 'iceberg.worker.num-threads' ) to read
manifests.

On that note, the ability to parallelize is limited to how many manifests
are in the table. So as a test, for a table with 4000 files we created one
manifest per file (think of one file as a single batch commit in this
case). So I was hoping to get a parallelism factor of 4000. But Iceberg
summarizes manifests into fewer manifests with each commit so we instead
ended up with 16 manifests. So now split planning is limited to reading at
most 16 units of parallelism. Is this grouping of manifests into fewer
configurable? if not should we allow making this configurable?

Sorry if this is forking a different conversation. If so, I can start a
separate conversation thread on this.






On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnyc...@apple.com>
wrote:

> Hey Gautam,
>
> Out of my curiosity, did you manage to confirm the root cause of the issue?
>
> P.S. I created [1] so that we can make collection of lower/upper bounds
> configurable.
>
> Thanks,
> Anton
>
> [1] - https://github.com/apache/incubator-iceberg/issues/173
>
> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com> wrote:
>
> Thanks guys for the insights ..
>
> > I like Anton's idea to have an optional list of columns for which we
> keep stats. That would allow us to avoid storing stats for thousands of
> columns that won't ever be used. Another option here is to add a flag to
> keep stats only for top-level columns. That's much less configuration for
> users and probably does the right thing in many cases. Simpler to use but
> not as fast in all cases is sometimes a good compromise.
>
> This makes sense to me. It adds a variable that data pipelines can tweak
> on to improve performance. I will add an issue on Github to add a stats
> config/flag. Although, having said that, I would try to optimize around
> this coz read patterns are hardly ever known a priori and adding a column
> to this list means having to re-write the entire data again. So i'l try the
> other suggestion which is parallelizing on multiple manifests.
>
> >  To clarify my comment on changing the storage: the idea is to use
> separate columns instead of a map and then use a columnar storage format so
> we can project those columns independently. Avro can't project columns
> independently. This wouldn't help on the write side and may just cause a
> lot of seeking on the read side that diminishes the benefits.
>
> Gotcha.
>
> > Also, now that we have more details, I think there is a second problem.
> Because we expect several manifests in a table, we parallelize split
> planning on manifests instead of splits of manifest files. This planning
> operation is happening in a single thread instead of in parallel. I think
> if you split the write across several manifests, you'd improve wall time.
>
> This might actually be the issue here, this was a test bench dataset so
> the writer job created a single manifest for all the data in the dataset
> which isn't really how we will do things in prod. I'l try and create the
> metadata based on productions expected commit pattern.
>
>
> Regarding Iceberg not truncating large bounded column values
> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
> consider this with our dataset. The current evidence is leading towards the
> number of columns and the sheer number of files that the manifest is
> maintaining but this is a good thing to look into.
>
> Thanks again guys.
>
> -Gautam.
>
>
>
>
>
>
>
> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>
>> I like Anton's idea to have an optional list of columns for which we keep
>> stats. That would allow us to avoid storing stats for thousands of columns
>> that won't ever be used. Another option here is to add a flag to keep stats
>> only for top-level columns. That's much less configuration for users and
>> probably does the right thing in many cases. Simpler to use but not as fast
>> in all cases is sometimes a good compromise.
>>
>> To clarify my comment on changing the storage: the idea is to use
>> separate columns instead of a map and then use a columnar storage format so
>> we can project those columns independently. Avro can't project columns
>> independently. This wouldn't help on the write side and may just cause a
>> lot of seeking on the read side that diminishes the benefits.
>>
>> Also, now that we have more details, I think there is a second problem.
>> Because we expect several manifests in a table, we parallelize split
>> planning on manifests instead of splits of manifest files. This planning
>> operation is happening in a single thread instead of in parallel. I think
>> if you split the write across several manifests, you'd improve wall time.
>>
>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <aokolnyc...@apple.com>
>> wrote:
>>
>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>> (then truncation will help) or a big number columns (then collecting
>>> lower/upper bounds only for specific columns will help). I think both
>>> optimizations are needed and will reduce the manifest size.
>>>
>>> Since you mentioned you have a lot of columns and we collect bounds for
>>> nested struct fields, I am wondering if you could revert [1] locally and
>>> compare the manifest size.
>>>
>>> [1] -
>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>
>>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com> wrote:
>>>
>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>> where the slowness is yet. It's generally due to the stats filtering but
>>> what part of it is causing this much network traffic. There's
>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>> calls. My guess is the expression evaluation on each manifest entry is
>>> what's doing it.
>>>
>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <aokolnyc...@apple.com>
>>> wrote:
>>>
>>>> I think we need to have a list of columns for which we want to collect
>>>> stats and that should be configurable by the user. Maybe, this config
>>>> should be applicable only to lower/upper bounds. As we now collect stats
>>>> even for nested struct fields, this might generate a lot of data. In most
>>>> cases, users cluster/sort their data by a subset of data columns to have
>>>> fast queries with predicates on those columns. So, being able to configure
>>>> columns for which to collect lower/upper bounds seems reasonable.
>>>>
>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com> wrote:
>>>>
>>>> >  The length in bytes of the schema is 109M as compared to 687K of the
>>>> non-stats dataset.
>>>>
>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>
>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkows...@gmail.com>
>>>> wrote:
>>>>
>>>>> Correction, partition count = 4308.
>>>>>
>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>
>>>>> Here I am trying to say that we don't need to change the format to
>>>>> columnar right? The current format is already friendly for 
>>>>> parallelization.
>>>>>
>>>>> thanks.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkows...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>>>>> details on the dataset with stats :
>>>>>>
>>>>>>  Iceberg Schema Columns : 20
>>>>>>  Spark Schema fields : 20
>>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>>>>> changed-partition-count=4308, total-records=11494037, 
>>>>>> total-data-files=4308}
>>>>>>  Manifest files :1
>>>>>>  Manifest details:
>>>>>>      => manifest file path:
>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>      => manifest file length: 109,028,885
>>>>>>      => existing files count: 0
>>>>>>      => added files count: 4308
>>>>>>      => deleted files count: 0
>>>>>>      => partitions count: 4
>>>>>>      => partition fields count: 4
>>>>>>
>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>> files. Total record count is 11.4 Million.
>>>>>>
>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>> although it has only 20 top-level columns,  num leaf columns are in order
>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and has
>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf fields
>>>>>> and additionally *lower-bounds, upper-bounds* for native, struct
>>>>>> types (not yet for map KVs and arrays).  The length in bytes of the 
>>>>>> schema
>>>>>> is 109M as compared to 687K of the non-stats dataset.
>>>>>>
>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>> datasets with much larger number of data files we want to leverage
>>>>>> iceberg's ability to skip entire files based on these stats. This is one 
>>>>>> of
>>>>>> the big incentives for us to use Iceberg.
>>>>>>
>>>>>> Re: Changing the way we keep stats. Avro is a block splittable format
>>>>>> and is friendly with parallel compute frameworks like Spark. So would it
>>>>>> make sense for instance to have add an option to have Spark job / Futures
>>>>>> handle split planning?   In a larger context, 109M is not that much
>>>>>> metadata given that Iceberg is meant for datasets where the metadata 
>>>>>> itself
>>>>>> is Bigdata scale.  I'm curious on how folks with larger sized metadata 
>>>>>> (in
>>>>>> GB) are optimizing this today.
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <rb...@netflix.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for bringing this up! My initial theory is that this table
>>>>>>> has a ton of stats data that you have to read. That could happen in a
>>>>>>> couple of cases.
>>>>>>>
>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>> suppress its stats if values are larger than 4k and those are what 
>>>>>>> Iceberg
>>>>>>> uses. But that could still cause you to store two 1k+ objects for each
>>>>>>> large column (lower and upper bounds). With a lot of data files, that 
>>>>>>> could
>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that we
>>>>>>> don't store the actual min and max for string or binary columns, but
>>>>>>> instead a truncated value that is just above or just below.
>>>>>>>
>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily take 
>>>>>>> 8k
>>>>>>> per file. If this is the problem, then maybe we want to have a way to 
>>>>>>> turn
>>>>>>> off column stats. We could also think of ways to change the way stats 
>>>>>>> are
>>>>>>> stored in the manifest files, but that only helps if we move to a 
>>>>>>> columnar
>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>
>>>>>>> If you can share a bit more information about this table, we can
>>>>>>> probably tell which one is the problem. I'm guessing it is the large 
>>>>>>> values
>>>>>>> problem.
>>>>>>>
>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkows...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello folks,
>>>>>>>>
>>>>>>>> I have been testing Iceberg reading with and without stats built
>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in 
>>>>>>>> network
>>>>>>>> traffic with the latter..
>>>>>>>>
>>>>>>>>
>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>> Iceberg format. One with and the other without stats collected in 
>>>>>>>> Iceberg
>>>>>>>> manifests. In particular the difference between the writers used for 
>>>>>>>> the
>>>>>>>> two datasets is this PR:
>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump 
>>>>>>>> from
>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both 
>>>>>>>> datasets.
>>>>>>>> There's a 30x jump in network traffic to the remote filesystem (ADLS) 
>>>>>>>> when
>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same 
>>>>>>>> Iceberg
>>>>>>>> reader code to access both datasets.
>>>>>>>>
>>>>>>>> ```
>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>> reading from file
>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type EN10MB
>>>>>>>> (Ethernet)
>>>>>>>>
>>>>>>>> *8844*
>>>>>>>>
>>>>>>>>
>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>
>>>>>>>> *269708*
>>>>>>>>
>>>>>>>> ```
>>>>>>>>
>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>> internet connection via VPN connecting to the remote FS. But the 
>>>>>>>> dataset
>>>>>>>> without stats took just 1m 49s while the dataset with stats took 26m 
>>>>>>>> 48s to
>>>>>>>> read the same sized data. Most of that time in the latter dataset was 
>>>>>>>> spent
>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>
>>>>>>>> ```
>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>  count(1)
>>>>>>>> ----------
>>>>>>>>      3627
>>>>>>>> (1 row)
>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>
>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>  count(1)
>>>>>>>> ----------
>>>>>>>>      3808
>>>>>>>> (1 row)
>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>> parallelism option here that can be leveraged.  Would appreciate some
>>>>>>>> guidance. If there isn't a straightforward fix and others feel this is 
>>>>>>>> an
>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>

Reply via email to