Replies inlin.

On Thu, Jan 28, 2021 at 9:42 AM kkishore iiith <kkishore.ii...@gmail.com>
wrote:

> Hello Community,
>
> I am solving the problem of handling late arrived data in one of our
> systems. Currently, we wait for 8 hours for the late data to arrive before
> starting processing the current hour data.
>
> We have three stages in our pipeline A -> B -> C where B waits for 8 hours
> for A's hourly data to complete, C waits for 8 hours for B's data to be
> complete as well.
>
> A writes Avro data in GCS (in a batch of 1000 files)
> B writes Parquet data in GCS
> C writes Parquet data in GCS
>
> Current sample path for A,B, C looks like
> gs://data/year=2021/month=1/day=27/hour=0/user_bucket=22/filename.deflate.avro
> <http://gcs>
>
> The Current solution that we are considering is
> a) A writes continues to write avro data in GCS  but records the filename
> + stats in iceberg
> b) B runs an initial run R1 after A's ontime data is completed without
> waiting for 8 hours. B queries iceberg for hour H1 data and write ontime
> data into GCS/iceberg
> c) Similarly C runs an initial run R1 for consuming B's ontime data for
> hour H1 and write ontime data into GCS/iceberg
> d) Consumers of C run initial run R1 for consuming C's ontime data as well
> e) After 8 hours, B runs another run R2 to process A's late data. B
> queries iceberg for hour H1 data after R1's runtime and writes the late
> data into GCS/iceberg
> f) C and its consumers repeat step (e)
>
> Could you please clarify the following questions.
>
> *1) Table level question*
> What is the best practice for creating the iceberg table, do we need to
> create it for all the days or one table per day? Our one day's worth of
> data is around 100TB and I am concerned if we run into scalability issues
> if we keep 1 year worth of data with one iceberg table. So far, we never
> need to query across multiple days as all of our queries are spanned within
> the same day, but that might likely change. Currently, we pre-create schema
> 2 days before the current day and the schema is locked for a day. We create
> the schema for our data everyday. If we keep one table for all of the days,
> how hard is it to update schema for every day?
>

A single partitioned Iceberg table can easily handle that much data. We
have tables that manage tens of petabytes.

Schemas can evolve, but are global for a table. So if you don't want to
read days from last year with the current schema, then you should create
separate tables.


> *2) Partition level question with respect to scalability*
>
> If we have one iceberg table per day, when we query an iceberg table for a
> snapshot, can we also provide a filter/regex on the filepath names like
> passing hour H and bucket number? If iceberg supports this, then we don't
> need any partitions on the event hour/bucket number. If partitions on event
> hour is a requirement for our use case, we have the following challenge
>

You can either make year, month, day, and hour columns, or you can store
your data with a partition spec that carries the relationship between a
timestamp column and the partitioning. If you partition by `hour(ts)` then
Iceberg will automatically use filters for `ts`, like `ts > TIMESTAMP
'2021-01-28 10:00:00'` to select the right hourly partitions.


> a) Currently our data do not contain bucket number and our event hour is
> not a field in our data, it is a derivation of (if field f1 has non-null
> value, pick that value otherwise fallback to f2 value). Can the partition
> spec support out of box for these two cases. If not, is there a workaround
> other than writing the bucket number and event hour in the data files?
>

Iceberg tables support bucketing.


> *3) GCS related question*
> Does iceberg support writing data into GCS? Because for the iceberg's
> atomicity to work <https://iceberg.apache.org/java-api-quickstart/>, GCS
> should support atomic rename, however from here
> <https://cloud.google.com/storage/docs/gsutil/commands/mv> GCS renames
> are not atomic. What are the workarounds to handle atomicity if iceberg
> doesn't support GCS
>

If a file system does not support atomic renames, then you should use a
metastore to track tables. You can use Hive, Nessie, or Glue. We also are
working on a JDBC catalog.


> *4) Confirming delta b/w two snapshots*
>
> Does iceberg return different results if I query the table for the same ts
> at two different timepoints ? Also, does iceberg support delta between two
> ts/snapshots? If so, are the delta files distributed across tasks in the
> spark job or it can only be accessible at the driver level?
>

Iceberg will return consistent results when you query the same snapshot
(version) of a table. If you query twice and read a different snapshot
after a table has been updated, then you may get a different result if the
data for the query was changed.

Iceberg's API can tell you what files were added or removed in any given
snapshot. You can also use time travel to query the table at a given
snapshot and use SQL to find the row-level changes. We don't currently
support reading just the changes in a snapshot because there may be deletes
as well as inserts.


> *5) Hadoop table vs hive table*
> From here <https://iceberg.apache.org/java-api-quickstart/>, iceberg
> supports two catalogs i.e., hive catalog and hadoop catalog ? Can someone
> explain like I am five about the differences between them? When to use one
> vs other?
>

The Hadoop catalog can be used when the file system supports atomic rename.
It doesn't require a separate service. Iceberg also supports Hive, Glue,
and Nessie catalogs.

I generally recommend a real metastore instead of using a file system.


> *6) Problem with avro write*
>
> Could someone please help
> https://stackoverflow.com/questions/65932414/iceberg-is-not-working-when-writing-avro-from-spark
>

You should write from Spark directly to the Iceberg table. Iceberg tracks
columns by ID, not by name, and you need to ensure that your Avro files
have field IDs.


>
> *7) Idempotent *
>
> Let's say we append 1000 gcs files into the iceberg table from a job but
> the job was restarted after committing into iceberg. Our jobs do not
> produce strictly immutable data. During retry, if the job tries to append
> 998 same files of the previous commit, however it has two new files in the
> append? What is the behaviour here?
>

If you are appending files through the API, then it is up to you to remove
duplicates. If you append duplicate files, then you will get duplicate
data.


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to