Hi Taher, I think most of your questions are answered in the Scan Planning section at the Iceberg spec page: https://iceberg.apache.org/spec/#scan-planning
To give you some specific answers as well: Equality Deletes: data and delete files have sequence numbers from which readers can infer the relative age of the data. Delete files are only applied to older data files. This means if you insert data again with a key that was deleted earlier then Iceberg should show the newly inserted record. Position Deletes: When reading data files, the reader must keep track of the file position and only return rows that do not have a record in the delete files. Alternatively you can do a big ANTI JOIN between data files and delete files. This latter was our approach in Impala: https://docs.google.com/document/d/1WF_UOanQ61RUuQlM4LaiRWI0YXpPKZ2VEJ8gyJdDyoY/edit#heading=h.5bmfhbmb4qdk Cheers, Zoltan On Thu, Sep 1, 2022 at 7:34 AM Taher Koitawala <taher...@gmail.com> wrote: > Thank you, Ryan and the iceberg community the suggestions really helped > progress a lot of development. On the same usecase, I hit another > block about doing CDC updates and deletes. > > I see two options for managing deletes, for now, EqualityDeletes and > PositionalDeletes: > > 1. EqaulityDeletes need me to delete a particular key that iceberg > then matches at scan time to skip those records. > 1. The problem here is that when a record with DeleleKey 1 is > inserted and then deleted and inserted again with key 1 iceberg shows no > records. That is the intended way it has to work I guess. But that > means I > need to be more careful when writing to iceberg. > 2. PositionalDeletes are amazing because I can give an offset and the > file name of where I want the record to be deleted and I can handle updates > here by delete and insert and the above case is handled. > 1. What I am stuck here with is, after records have been inserted > into a file and if I see a delete request or update how do I find the > offset of the record that needs to be deleted in the inserts file? > 2. Do I need to do a table scan every time I get a delete request? > that means I will do a lot of IO and CDC implementation will be crazy > slow. > > Please can you suggest what is the correct way of applying CDC log files > correctly with a JVM task. > > Regards, > Taher Koitawala > > > > > > On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <taher...@gmail.com> > wrote: > >> Thank you for your response Ryan. We will evaluate your suggestions to >> sticking with a query engine and also I will try to code you share with me. >> >> >> >> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <b...@tabular.io> wrote: >> >>> Hi Taher, >>> >>> It looks like you’re writing something in Java to work with the data >>> directly. That’s well supported, but you may want to consider using a >>> compute engine to make this process a bit easier. Most of the issues that >>> you’re hitting would probably be solved automatically because those engines >>> will translate correctly to and from Iceberg and other formats. >>> >>> Assuming that you want to move forward with Java, I think the issue >>> you’re hitting is that you’re not using the same in-memory object model to >>> read and write and are then trying to translate values by hand. Instead, I >>> recommend using Iceberg readers for both reading and writing so that you >>> get consistent in-memory records. >>> >>> To do that, you should use the Parquet class to instantiate both the >>> reader and writer. You’ll need to use an Iceberg schema from the table >>> you’re writing into (which has assigned the field IDs). For the reader, >>> you’ll also need to pass a name mapping (withNameMapping) that you can >>> generate from the schema with MappingUtil.create(icebergSchema). That >>> name mapping enables you to read Parquet files without field IDs. >>> >>> Once you have it set up, it should look something like this: >>> >>> schema = ParquetSchemaUtil.convert(parquetSchema) >>> table = catalog.createTable(identifier, schema) >>> nameMapping = MappingUtil.create(table.schema()) >>> >>> try (CloseableIterable<Record> reader = >>> Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) >>> { >>> try (FileAppender<Record> writer = >>> Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) >>> { >>> for (Record record : reader) { >>> writer.add(record); >>> } >>> } >>> } >>> >>> Ryan >>> >>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> Please can someone guide me regarding the above email? >>>> >>>> Regards, >>>> Taher Koitawala >>>> >>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> I am creating an iceberg writer over temporal service that >>>>> converts CDC parquet files to Iceberg format. That means that the file >>>>> will >>>>> have a record and corresponding timestamp flags like `inserted_at`, >>>>> `deleted_at` and `updated_at`, each of which will have a value defining >>>>> the >>>>> action. >>>>> >>>>> Initially, when there is no table in the iceberg catalog, the plan is >>>>> to use the Parquet footer schema and map that directly to the Iceberg >>>>> schema using >>>>> *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType >>>>> parquetSchema).* However, the issue that I am facing is that I am >>>>> also having to convert Parquet datatypes to Iceberg datatypes, >>>>> specifically the timestamp types when inserting into the table. >>>>> >>>>> When using the Parquet reader with the simple group, I see the >>>>> timestamp as long and when inserted to iceberg, it expects it to be >>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be >>>>> cast to OffsetDateTime` >>>>> >>>>> I have 2 questions on this use case: >>>>> 1. Is there an easy way to insert parquet to iceberg records directly >>>>> without me having to do a type conversion since the goal is to make it all >>>>> happen within temporal? >>>>> 2. Need suggestions to handle updates. As for updates I'm having to >>>>> commit inserts and then commit deletes and then create a new writer again >>>>> to proceed. >>>>> >>>>> Regards, >>>>> Taher Koitawala >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Tabular >>> >>