There are two ways of doing it.

   1. Through snapshot offered meaning an immutable snapshot of the state
   of the table at a given version. For example, the state
   <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
   table <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
   the version
   <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
   2. Creating your own versioning. Taking your example you define the
   target storage *with two added columns, namely:* op_type INT
   (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
   Your example record will be


id               op_type      op_time

1                1             <ingestion_time>

1                3             <ingestion_time>


       df = rdd.toDF(). \

            withColumnRenamed("_1", "ID"). \

            withColumnRenamed("_2", "CLUSTERED"). \

            withColumnRenamed("_3", "SCATTERED"). \

            withColumnRenamed("_4", "RANDOMISED"). \

            withColumnRenamed("_5", "RANDOM_STRING"). \

            withColumnRenamed("_6", "SMALL_VC"). \

            withColumnRenamed("_7", "PADDING"). \

            withColumn("op_type", lit(1)). \

            withColumn("op_time", current_timestamp())

Then  you can look at all records that were created and subsequently
deleted and at what time


SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jan 2022 at 17:54, Sid Kal <flinkbyhe...@gmail.com> wrote:

> Hi Sean,
>
> So you mean if I use those file formats it will do the work of CDC
> automatically or I would have to handle it via code ?
>
> Hi Mich,
>
> Not sure if I understood you. Let me try to explain my scenario. Suppose
> there is a Id "1" which is inserted today, so I transformed and ingested
> it. Now suppose if this user id is deleted from the source itself. Then how
> can I delete it in my transformed db
> ?
>
>
>
> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sro...@gmail.com> wrote:
>
>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>> etc of data, using Spark, on cloud storage.
>>
>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Where ETL data is stored?
>>>
>>>
>>>
>>> *But now the main problem is when the record at the source is deleted,
>>> it should be deleted in my final transformed record too.*
>>>
>>>
>>> If your final sync (storage) is data warehouse, it should be soft
>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <flinkbyhe...@gmail.com> wrote:
>>>
>>>> I am using Spark incremental approach for bringing the latest data
>>>> everyday. Everything works fine.
>>>>
>>>> But now the main problem is when the record at the source is deleted,
>>>> it should be deleted in my final transformed record too.
>>>>
>>>> How do I capture such changes and change my table too ?
>>>>
>>>> Best regards,
>>>> Sid
>>>>
>>>>

Reply via email to