Btw, 2 options Mitch explained are not mutually exclusive. Option 2 can and should be implemented over a delta lake table anyway. Especially if you need to do hard deletes eventually (eg for regulatory needs)
On Fri, 28 Jan 2022 at 6:50 am, Sid Kal <flinkbyhe...@gmail.com> wrote: > Thanks Mich and Sean for your time > > On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, <mich.talebza...@gmail.com> > wrote: > >> Yes I believe so. >> >> Check this article of mine dated early 2019 but will have some relevance >> to what I am implying. >> >> >> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/ >> >> HTH >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Thu, 27 Jan 2022 at 18:46, Sid Kal <flinkbyhe...@gmail.com> wrote: >> >>> Okay sounds good. >>> >>> So, below two options would help me to capture CDC changes: >>> >>> 1) Delta lake >>> 2) Maintaining snapshot of records with some indicators and timestamp. >>> >>> Correct me if I'm wrong >>> >>> Thanks, >>> Sid >>> >>> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mich.talebza...@gmail.com> >>> wrote: >>> >>>> There are two ways of doing it. >>>> >>>> >>>> 1. Through snapshot offered meaning an immutable snapshot of the >>>> state of the table at a given version. For example, the state >>>> <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a >>>> Delta >>>> table >>>> <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at >>>> the version >>>> <https://books.japila.pl/delta-lake-internals/Snapshot/#version>. >>>> 2. Creating your own versioning. Taking your example you define the >>>> target storage *with two added columns, namely:* op_type INT >>>> (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>. >>>> Your example record will be >>>> >>>> >>>> id op_type op_time >>>> >>>> 1 1 <ingestion_time> >>>> >>>> 1 3 <ingestion_time> >>>> >>>> >>>> df = rdd.toDF(). \ >>>> >>>> withColumnRenamed("_1", "ID"). \ >>>> >>>> withColumnRenamed("_2", "CLUSTERED"). \ >>>> >>>> withColumnRenamed("_3", "SCATTERED"). \ >>>> >>>> withColumnRenamed("_4", "RANDOMISED"). \ >>>> >>>> withColumnRenamed("_5", "RANDOM_STRING"). \ >>>> >>>> withColumnRenamed("_6", "SMALL_VC"). \ >>>> >>>> withColumnRenamed("_7", "PADDING"). \ >>>> >>>> withColumn("op_type", lit(1)). \ >>>> >>>> withColumn("op_time", current_timestamp()) >>>> >>>> Then you can look at all records that were created and subsequently >>>> deleted and at what time >>>> >>>> >>>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3) >>>> >>>> >>>> HTH >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Thu, 27 Jan 2022 at 17:54, Sid Kal <flinkbyhe...@gmail.com> wrote: >>>> >>>>> Hi Sean, >>>>> >>>>> So you mean if I use those file formats it will do the work of CDC >>>>> automatically or I would have to handle it via code ? >>>>> >>>>> Hi Mich, >>>>> >>>>> Not sure if I understood you. Let me try to explain my scenario. >>>>> Suppose there is a Id "1" which is inserted today, so I transformed and >>>>> ingested it. Now suppose if this user id is deleted from the source >>>>> itself. >>>>> Then how can I delete it in my transformed db >>>>> ? >>>>> >>>>> >>>>> >>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sro...@gmail.com> wrote: >>>>> >>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No >>>>>> need to manage it manually or use a DBMS. These formats allow deletes, >>>>>> upserts, etc of data, using Spark, on cloud storage. >>>>>> >>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Where ETL data is stored? >>>>>>> >>>>>>> >>>>>>> >>>>>>> *But now the main problem is when the record at the source is >>>>>>> deleted, it should be deleted in my final transformed record too.* >>>>>>> >>>>>>> >>>>>>> If your final sync (storage) is data warehouse, it should be soft >>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp). >>>>>>> >>>>>>> >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <flinkbyhe...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I am using Spark incremental approach for bringing the latest data >>>>>>>> everyday. Everything works fine. >>>>>>>> >>>>>>>> But now the main problem is when the record at the source is >>>>>>>> deleted, it should be deleted in my final transformed record too. >>>>>>>> >>>>>>>> How do I capture such changes and change my table too ? >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Sid >>>>>>>> >>>>>>>> -- Best Regards, Ayan Guha