Hello Community,

I am solving the problem of handling late arrived data in one of our
systems. Currently, we wait for 8 hours for the late data to arrive before
starting processing the current hour data.

We have three stages in our pipeline A -> B -> C where B waits for 8 hours
for A's hourly data to complete, C waits for 8 hours for B's data to be
complete as well.

A writes Avro data in GCS (in a batch of 1000 files)
B writes Parquet data in GCS
C writes Parquet data in GCS

Current sample path for A,B, C looks like
gs://data/year=2021/month=1/day=27/hour=0/user_bucket=22/filename.deflate.avro
<http://gcs>

The Current solution that we are considering is
a) A writes continues to write avro data in GCS  but records the filename +
stats in iceberg
b) B runs an initial run R1 after A's ontime data is completed without
waiting for 8 hours. B queries iceberg for hour H1 data and write ontime
data into GCS/iceberg
c) Similarly C runs an initial run R1 for consuming B's ontime data for
hour H1 and write ontime data into GCS/iceberg
d) Consumers of C run initial run R1 for consuming C's ontime data as well
e) After 8 hours, B runs another run R2 to process A's late data. B queries
iceberg for hour H1 data after R1's runtime and writes the late data into
GCS/iceberg
f) C and its consumers repeat step (e)

Could you please clarify the following questions.

*1) Table level question*
What is the best practice for creating the iceberg table, do we need to
create it for all the days or one table per day? Our one day's worth of
data is around 100TB and I am concerned if we run into scalability issues
if we keep 1 year worth of data with one iceberg table. So far, we never
need to query across multiple days as all of our queries are spanned within
the same day, but that might likely change. Currently, we pre-create schema
2 days before the current day and the schema is locked for a day. We create
the schema for our data everyday. If we keep one table for all of the days,
how hard is it to update schema for every day?

*2) Partition level question with respect to scalability*

If we have one iceberg table per day, when we query an iceberg table for a
snapshot, can we also provide a filter/regex on the filepath names like
passing hour H and bucket number? If iceberg supports this, then we don't
need any partitions on the event hour/bucket number. If partitions on event
hour is a requirement for our use case, we have the following challenge

a) Currently our data do not contain bucket number and our event hour is
not a field in our data, it is a derivation of (if field f1 has non-null
value, pick that value otherwise fallback to f2 value). Can the partition
spec support out of box for these two cases. If not, is there a workaround
other than writing the bucket number and event hour in the data files?

*3) GCS related question*
Does iceberg support writing data into GCS? Because for the iceberg's
atomicity to work <https://iceberg.apache.org/java-api-quickstart/>, GCS
should support atomic rename, however from here
<https://cloud.google.com/storage/docs/gsutil/commands/mv> GCS renames are
not atomic. What are the workarounds to handle atomicity if iceberg doesn't
support GCS

*4) Confirming delta b/w two snapshots*

Does iceberg return different results if I query the table for the same ts
at two different timepoints ? Also, does iceberg support delta between two
ts/snapshots? If so, are the delta files distributed across tasks in the
spark job or it can only be accessible at the driver level?

*5) Hadoop table vs hive table*
>From here <https://iceberg.apache.org/java-api-quickstart/>, iceberg
supports two catalogs i.e., hive catalog and hadoop catalog ? Can someone
explain like I am five about the differences between them? When to use one
vs other?

*6) Problem with avro write*

Could someone please help
https://stackoverflow.com/questions/65932414/iceberg-is-not-working-when-writing-avro-from-spark


*7) Idempotent *

Let's say we append 1000 gcs files into the iceberg table from a job but
the job was restarted after committing into iceberg. Our jobs do not
produce strictly immutable data. During retry, if the job tries to append
998 same files of the previous commit, however it has two new files in the
append? What is the behaviour here?


Kind regards,
Kishor

Reply via email to