Thanks Mich and Sean for your time

On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, <mich.talebza...@gmail.com>
wrote:

> Yes I believe so.
>
> Check this article of mine dated early 2019 but will have some relevance
> to what I am implying.
>
>
> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 18:46, Sid Kal <flinkbyhe...@gmail.com> wrote:
>
>> Okay sounds good.
>>
>> So,  below two options would help me to capture CDC changes:
>>
>> 1) Delta lake
>> 2) Maintaining snapshot of records with some indicators and timestamp.
>>
>> Correct me if I'm wrong
>>
>> Thanks,
>> Sid
>>
>> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mich.talebza...@gmail.com>
>> wrote:
>>
>>> There are two ways of doing it.
>>>
>>>
>>>    1. Through snapshot offered meaning an immutable snapshot of the
>>>    state of the table at a given version. For example, the state
>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>>>    table
>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>>>    the version
>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>>>    2. Creating your own versioning. Taking your example you define the
>>>    target storage *with two added columns, namely:* op_type INT
>>>    (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
>>>    Your example record will be
>>>
>>>
>>> id               op_type      op_time
>>>
>>> 1                1             <ingestion_time>
>>>
>>> 1                3             <ingestion_time>
>>>
>>>
>>>        df = rdd.toDF(). \
>>>
>>>             withColumnRenamed("_1", "ID"). \
>>>
>>>             withColumnRenamed("_2", "CLUSTERED"). \
>>>
>>>             withColumnRenamed("_3", "SCATTERED"). \
>>>
>>>             withColumnRenamed("_4", "RANDOMISED"). \
>>>
>>>             withColumnRenamed("_5", "RANDOM_STRING"). \
>>>
>>>             withColumnRenamed("_6", "SMALL_VC"). \
>>>
>>>             withColumnRenamed("_7", "PADDING"). \
>>>
>>>             withColumn("op_type", lit(1)). \
>>>
>>>             withColumn("op_time", current_timestamp())
>>>
>>> Then  you can look at all records that were created and subsequently
>>> deleted and at what time
>>>
>>>
>>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 17:54, Sid Kal <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Hi Sean,
>>>>
>>>> So you mean if I use those file formats it will do the work of CDC
>>>> automatically or I would have to handle it via code ?
>>>>
>>>> Hi Mich,
>>>>
>>>> Not sure if I understood you. Let me try to explain my scenario.
>>>> Suppose there is a Id "1" which is inserted today, so I transformed and
>>>> ingested it. Now suppose if this user id is deleted from the source itself.
>>>> Then how can I delete it in my transformed db
>>>> ?
>>>>
>>>>
>>>>
>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sro...@gmail.com> wrote:
>>>>
>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No
>>>>> need to manage it manually or use a DBMS. These formats allow deletes,
>>>>> upserts, etc of data, using Spark, on cloud storage.
>>>>>
>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Where ETL data is stored?
>>>>>>
>>>>>>
>>>>>>
>>>>>> *But now the main problem is when the record at the source is
>>>>>> deleted, it should be deleted in my final transformed record too.*
>>>>>>
>>>>>>
>>>>>> If your final sync (storage) is data warehouse, it should be soft
>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>>>
>>>>>>
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <flinkbyhe...@gmail.com> wrote:
>>>>>>
>>>>>>> I am using Spark incremental approach for bringing the latest data
>>>>>>> everyday. Everything works fine.
>>>>>>>
>>>>>>> But now the main problem is when the record at the source is
>>>>>>> deleted, it should be deleted in my final transformed record too.
>>>>>>>
>>>>>>> How do I capture such changes and change my table too ?
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Sid
>>>>>>>
>>>>>>>

Reply via email to