FYI .. The test Partition Spec is  :
[
  YEAR: identity(21)
  MONTH: identity(22)
  DAY: identity(23)
  batchId: identity(24)
]



On Thu, May 2, 2019 at 1:46 PM Gautam <gautamkows...@gmail.com> wrote:

> > Using those, you should be able to control parallelism. If you want to
> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
> compact manifests.
>
> This is helpful. Thanks for the pointer on increasing parallelism. Will
> try this out. So I understand the behaviour, if a different dataset has
> >=5000  batches then the resultant # manifests would be (total_num_batches
> % 5000 ) ?
>
> > What surprises me is that you’re not getting much benefit from filtering
> out manifests that aren’t helpful. We see a lot of benefit from it.
>
> Pardon the verbose example but i think it'l help explain what i'm seeing
> ..
>
> Regarding manifest filtering:  I tested if partition filters in sql query
> actually reduce manifests being inspected. In my example, i have 16
> manifests that point to 4000 batch partitions ( each file is restricted to
> one partition as we'r using physical partitioning in the table ).  So when
> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
> 16) , right? But i see that all 16 are being inspected in
> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
> that should be giving me the manifests that match a partition. When I
> inspect this  it says `matchingManifests = 16` ,  which is all the
> manifests in the table.  This *could* be due to the fact that our batch
> ids are random UUIDs so lower/upper bounds may not help coz there's no
> inherent ordering amongst batches.
> But then  i tried year = 2019 and month = 01 and day = 01 which also
> scanned all manifests. Could this be due to the way Iceberg manifests are
> re-grouped and merged? If so, shouldn't re-grouping honour partition
> boundaries and optimize for it?
>
>
> Cheers,
> -Gautam.
>
> [1] -
> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>
>
> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> Good questions. Grouping manifests is configurable at the table level.
>> There are 2 settings:
>>
>>    - commit.manifest.target-size-bytes defaults to 8MB, this is the
>>    target size that Iceberg will compact to
>>    - commit.manifest.min-count-to-merge defaults to 100, this is the
>>    minimum number of files before a compaction is triggered
>>
>> Using those, you should be able to control parallelism. If you want to
>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>> compact manifests.
>>
>> What surprises me is that you’re not getting much benefit from filtering
>> out manifests that aren’t helpful. We see a lot of benefit from it. You
>> might try sorting the data files by partition before adding them to the
>> table. That will cluster data files in the same partition so you can read
>> fewer manifests.
>>
>> On Thu, May 2, 2019 at 12:09 PM Gautam <gautamkows...@gmail.com> wrote:
>>
>>> Hey Anton,
>>>             Sorry bout the delay on this. Been caught up with some other
>>> things. Thanks for raising issue#173 .
>>>
>>> So the root cause is indeed the density and size of the schema. While I
>>> agree the option to configure stats for columns is good (although i'm not
>>> fully convinced that this is purely due to lower/upper bounds). For
>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>> now is to parallelize the manifest reading in split planning. We
>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>> manifests.
>>>
>>> On that note, the ability to parallelize is limited to how many
>>> manifests are in the table. So as a test, for a table with 4000 files we
>>> created one manifest per file (think of one file as a single batch commit
>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>> instead ended up with 16 manifests. So now split planning is limited to
>>> reading at most 16 units of parallelism. Is this grouping of manifests into
>>> fewer configurable? if not should we allow making this configurable?
>>>
>>> Sorry if this is forking a different conversation. If so, I can start a
>>> separate conversation thread on this.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <aokolnyc...@apple.com>
>>> wrote:
>>>
>>>> Hey Gautam,
>>>>
>>>> Out of my curiosity, did you manage to confirm the root cause of the
>>>> issue?
>>>>
>>>> P.S. I created [1] so that we can make collection of lower/upper bounds
>>>> configurable.
>>>>
>>>> Thanks,
>>>> Anton
>>>>
>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173
>>>>
>>>> On 22 Apr 2019, at 09:15, Gautam <gautamkows...@gmail.com> wrote:
>>>>
>>>> Thanks guys for the insights ..
>>>>
>>>> > I like Anton's idea to have an optional list of columns for which we
>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>> columns that won't ever be used. Another option here is to add a flag to
>>>> keep stats only for top-level columns. That's much less configuration for
>>>> users and probably does the right thing in many cases. Simpler to use but
>>>> not as fast in all cases is sometimes a good compromise.
>>>>
>>>> This makes sense to me. It adds a variable that data pipelines can
>>>> tweak on to improve performance. I will add an issue on Github to add a
>>>> stats config/flag. Although, having said that, I would try to optimize
>>>> around this coz read patterns are hardly ever known a priori and adding a
>>>> column to this list means having to re-write the entire data again. So i'l
>>>> try the other suggestion which is parallelizing on multiple manifests.
>>>>
>>>> >  To clarify my comment on changing the storage: the idea is to use
>>>> separate columns instead of a map and then use a columnar storage format so
>>>> we can project those columns independently. Avro can't project columns
>>>> independently. This wouldn't help on the write side and may just cause a
>>>> lot of seeking on the read side that diminishes the benefits.
>>>>
>>>> Gotcha.
>>>>
>>>> > Also, now that we have more details, I think there is a second
>>>> problem. Because we expect several manifests in a table, we parallelize
>>>> split planning on manifests instead of splits of manifest files. This
>>>> planning operation is happening in a single thread instead of in parallel.
>>>> I think if you split the write across several manifests, you'd improve wall
>>>> time.
>>>>
>>>> This might actually be the issue here, this was a test bench dataset so
>>>> the writer job created a single manifest for all the data in the dataset
>>>> which isn't really how we will do things in prod. I'l try and create the
>>>> metadata based on productions expected commit pattern.
>>>>
>>>>
>>>> Regarding Iceberg not truncating large bounded column values
>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
>>>> consider this with our dataset. The current evidence is leading towards the
>>>> number of columns and the sheer number of files that the manifest is
>>>> maintaining but this is a good thing to look into.
>>>>
>>>> Thanks again guys.
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> I like Anton's idea to have an optional list of columns for which we
>>>>> keep stats. That would allow us to avoid storing stats for thousands of
>>>>> columns that won't ever be used. Another option here is to add a flag to
>>>>> keep stats only for top-level columns. That's much less configuration for
>>>>> users and probably does the right thing in many cases. Simpler to use but
>>>>> not as fast in all cases is sometimes a good compromise.
>>>>>
>>>>> To clarify my comment on changing the storage: the idea is to use
>>>>> separate columns instead of a map and then use a columnar storage format 
>>>>> so
>>>>> we can project those columns independently. Avro can't project columns
>>>>> independently. This wouldn't help on the write side and may just cause a
>>>>> lot of seeking on the read side that diminishes the benefits.
>>>>>
>>>>> Also, now that we have more details, I think there is a second
>>>>> problem. Because we expect several manifests in a table, we parallelize
>>>>> split planning on manifests instead of splits of manifest files. This
>>>>> planning operation is happening in a single thread instead of in parallel.
>>>>> I think if you split the write across several manifests, you'd improve 
>>>>> wall
>>>>> time.
>>>>>
>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi <
>>>>> aokolnyc...@apple.com> wrote:
>>>>>
>>>>>> No, we haven’t experienced it yet. The manifest size is huge in your
>>>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>>>>>> (then truncation will help) or a big number columns (then collecting
>>>>>> lower/upper bounds only for specific columns will help). I think both
>>>>>> optimizations are needed and will reduce the manifest size.
>>>>>>
>>>>>> Since you mentioned you have a lot of columns and we collect bounds
>>>>>> for nested struct fields, I am wondering if you could revert [1] locally
>>>>>> and compare the manifest size.
>>>>>>
>>>>>> [1] -
>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>>>>>
>>>>>> On 19 Apr 2019, at 15:42, Gautam <gautamkows...@gmail.com> wrote:
>>>>>>
>>>>>> Thanks for responding Anton! Do we think the delay is mainly due to
>>>>>> lower/upper bound filtering? have you faced this? I haven't exactly found
>>>>>> where the slowness is yet. It's generally due to the stats filtering but
>>>>>> what part of it is causing this much network traffic. There's
>>>>>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>>>>>> calls. My guess is the expression evaluation on each manifest entry is
>>>>>> what's doing it.
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi <
>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>
>>>>>>> I think we need to have a list of columns for which we want to
>>>>>>> collect stats and that should be configurable by the user. Maybe, this
>>>>>>> config should be applicable only to lower/upper bounds. As we now 
>>>>>>> collect
>>>>>>> stats even for nested struct fields, this might generate a lot of data. 
>>>>>>> In
>>>>>>> most cases, users cluster/sort their data by a subset of data columns to
>>>>>>> have fast queries with predicates on those columns. So, being able to
>>>>>>> configure columns for which to collect lower/upper bounds seems 
>>>>>>> reasonable.
>>>>>>>
>>>>>>> On 19 Apr 2019, at 08:03, Gautam <gautamkows...@gmail.com> wrote:
>>>>>>>
>>>>>>> >  The length in bytes of the schema is 109M as compared to 687K of
>>>>>>> the non-stats dataset.
>>>>>>>
>>>>>>> Typo, length in bytes of *manifest*. schema is the same.
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <gautamkows...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Correction, partition count = 4308.
>>>>>>>>
>>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>> format and is friendly with parallel compute frameworks like Spark.
>>>>>>>>
>>>>>>>> Here I am trying to say that we don't need to change the format to
>>>>>>>> columnar right? The current format is already friendly for 
>>>>>>>> parallelization.
>>>>>>>>
>>>>>>>> thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <gautamkows...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>>>>>>>> details on the dataset with stats :
>>>>>>>>>
>>>>>>>>>  Iceberg Schema Columns : 20
>>>>>>>>>  Spark Schema fields : 20
>>>>>>>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>>>>>>>> changed-partition-count=4308, total-records=11494037, 
>>>>>>>>> total-data-files=4308}
>>>>>>>>>  Manifest files :1
>>>>>>>>>  Manifest details:
>>>>>>>>>      => manifest file path:
>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>>>>>>>      => manifest file length: 109,028,885
>>>>>>>>>      => existing files count: 0
>>>>>>>>>      => added files count: 4308
>>>>>>>>>      => deleted files count: 0
>>>>>>>>>      => partitions count: 4
>>>>>>>>>      => partition fields count: 4
>>>>>>>>>
>>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308
>>>>>>>>> files. Total record count is 11.4 Million.
>>>>>>>>>
>>>>>>>>> Re: Columns. You are right that this table has many columns..
>>>>>>>>> although it has only 20 top-level columns,  num leaf columns are in 
>>>>>>>>> order
>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and 
>>>>>>>>> has
>>>>>>>>> deep levels of nesting.  I know Iceberg keeps
>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf
>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for native,
>>>>>>>>> struct types (not yet for map KVs and arrays).  The length in bytes 
>>>>>>>>> of the
>>>>>>>>> schema is 109M as compared to 687K of the non-stats dataset.
>>>>>>>>>
>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>>>>>>>> datasets with much larger number of data files we want to leverage
>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is 
>>>>>>>>> one of
>>>>>>>>> the big incentives for us to use Iceberg.
>>>>>>>>>
>>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable
>>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So
>>>>>>>>> would it make sense for instance to have add an option to have Spark 
>>>>>>>>> job /
>>>>>>>>> Futures  handle split planning?   In a larger context, 109M is not 
>>>>>>>>> that
>>>>>>>>> much metadata given that Iceberg is meant for datasets where the 
>>>>>>>>> metadata
>>>>>>>>> itself is Bigdata scale.  I'm curious on how folks with larger sized
>>>>>>>>> metadata (in GB) are optimizing this today.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue <
>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for bringing this up! My initial theory is that this table
>>>>>>>>>> has a ton of stats data that you have to read. That could happen in a
>>>>>>>>>> couple of cases.
>>>>>>>>>>
>>>>>>>>>> First, you might have large values in some columns. Parquet will
>>>>>>>>>> suppress its stats if values are larger than 4k and those are what 
>>>>>>>>>> Iceberg
>>>>>>>>>> uses. But that could still cause you to store two 1k+ objects for 
>>>>>>>>>> each
>>>>>>>>>> large column (lower and upper bounds). With a lot of data files, 
>>>>>>>>>> that could
>>>>>>>>>> add up quickly. The solution here is to implement #113
>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that
>>>>>>>>>> we don't store the actual min and max for string or binary columns, 
>>>>>>>>>> but
>>>>>>>>>> instead a truncated value that is just above or just below.
>>>>>>>>>>
>>>>>>>>>> The second case is when you have a lot of columns. Each column
>>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily 
>>>>>>>>>> take 8k
>>>>>>>>>> per file. If this is the problem, then maybe we want to have a way 
>>>>>>>>>> to turn
>>>>>>>>>> off column stats. We could also think of ways to change the way 
>>>>>>>>>> stats are
>>>>>>>>>> stored in the manifest files, but that only helps if we move to a 
>>>>>>>>>> columnar
>>>>>>>>>> format to store manifests, so this is probably not a short-term fix.
>>>>>>>>>>
>>>>>>>>>> If you can share a bit more information about this table, we can
>>>>>>>>>> probably tell which one is the problem. I'm guessing it is the large 
>>>>>>>>>> values
>>>>>>>>>> problem.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <gautamkows...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello folks,
>>>>>>>>>>>
>>>>>>>>>>> I have been testing Iceberg reading with and without stats built
>>>>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in 
>>>>>>>>>>> network
>>>>>>>>>>> traffic with the latter..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in
>>>>>>>>>>> Iceberg format. One with and the other without stats collected in 
>>>>>>>>>>> Iceberg
>>>>>>>>>>> manifests. In particular the difference between the writers used 
>>>>>>>>>>> for the
>>>>>>>>>>> two datasets is this PR:
>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which
>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump 
>>>>>>>>>>> from
>>>>>>>>>>> query scans run on these two datasets.  The partition being scanned
>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both 
>>>>>>>>>>> datasets.
>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem 
>>>>>>>>>>> (ADLS) when
>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same 
>>>>>>>>>>> Iceberg
>>>>>>>>>>> reader code to access both datasets.
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>> reading from file
>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type 
>>>>>>>>>>> EN10MB
>>>>>>>>>>> (Ethernet)
>>>>>>>>>>>
>>>>>>>>>>> *8844*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r
>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep
>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>>>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap,
>>>>>>>>>>> link-type EN10MB (Ethernet)
>>>>>>>>>>>
>>>>>>>>>>> *269708*
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> As a consequence of this the query response times get affected
>>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow
>>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the 
>>>>>>>>>>> dataset
>>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took 
>>>>>>>>>>> 26m 48s to
>>>>>>>>>>> read the same sized data. Most of that time in the latter dataset 
>>>>>>>>>>> was spent
>>>>>>>>>>> split planning in Manifest reading and stats evaluation.
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues
>>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9';
>>>>>>>>>>>  count(1)
>>>>>>>>>>> ----------
>>>>>>>>>>>      3627
>>>>>>>>>>> (1 row)
>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)*
>>>>>>>>>>>
>>>>>>>>>>> all=>  select count(*) from iceberg_scratch_pad_demo_11  where
>>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15';
>>>>>>>>>>>  count(1)
>>>>>>>>>>> ----------
>>>>>>>>>>>      3808
>>>>>>>>>>> (1 row)
>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)*
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or
>>>>>>>>>>> parallelism option here that can be leveraged.  Would appreciate 
>>>>>>>>>>> some
>>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this 
>>>>>>>>>>> is an
>>>>>>>>>>> issue I can raise an issue and look into it further.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> -Gautam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Ryan Blue
>>>>>>>>>> Software Engineer
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Reply via email to