Thanks Mich and Sean for your time On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, <mich.talebza...@gmail.com> wrote:
> Yes I believe so. > > Check this article of mine dated early 2019 but will have some relevance > to what I am implying. > > > https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/ > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 27 Jan 2022 at 18:46, Sid Kal <flinkbyhe...@gmail.com> wrote: > >> Okay sounds good. >> >> So, below two options would help me to capture CDC changes: >> >> 1) Delta lake >> 2) Maintaining snapshot of records with some indicators and timestamp. >> >> Correct me if I'm wrong >> >> Thanks, >> Sid >> >> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mich.talebza...@gmail.com> >> wrote: >> >>> There are two ways of doing it. >>> >>> >>> 1. Through snapshot offered meaning an immutable snapshot of the >>> state of the table at a given version. For example, the state >>> <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta >>> table >>> <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at >>> the version >>> <https://books.japila.pl/delta-lake-internals/Snapshot/#version>. >>> 2. Creating your own versioning. Taking your example you define the >>> target storage *with two added columns, namely:* op_type INT >>> (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>. >>> Your example record will be >>> >>> >>> id op_type op_time >>> >>> 1 1 <ingestion_time> >>> >>> 1 3 <ingestion_time> >>> >>> >>> df = rdd.toDF(). \ >>> >>> withColumnRenamed("_1", "ID"). \ >>> >>> withColumnRenamed("_2", "CLUSTERED"). \ >>> >>> withColumnRenamed("_3", "SCATTERED"). \ >>> >>> withColumnRenamed("_4", "RANDOMISED"). \ >>> >>> withColumnRenamed("_5", "RANDOM_STRING"). \ >>> >>> withColumnRenamed("_6", "SMALL_VC"). \ >>> >>> withColumnRenamed("_7", "PADDING"). \ >>> >>> withColumn("op_type", lit(1)). \ >>> >>> withColumn("op_time", current_timestamp()) >>> >>> Then you can look at all records that were created and subsequently >>> deleted and at what time >>> >>> >>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3) >>> >>> >>> HTH >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Thu, 27 Jan 2022 at 17:54, Sid Kal <flinkbyhe...@gmail.com> wrote: >>> >>>> Hi Sean, >>>> >>>> So you mean if I use those file formats it will do the work of CDC >>>> automatically or I would have to handle it via code ? >>>> >>>> Hi Mich, >>>> >>>> Not sure if I understood you. Let me try to explain my scenario. >>>> Suppose there is a Id "1" which is inserted today, so I transformed and >>>> ingested it. Now suppose if this user id is deleted from the source itself. >>>> Then how can I delete it in my transformed db >>>> ? >>>> >>>> >>>> >>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sro...@gmail.com> wrote: >>>> >>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No >>>>> need to manage it manually or use a DBMS. These formats allow deletes, >>>>> upserts, etc of data, using Spark, on cloud storage. >>>>> >>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> Where ETL data is stored? >>>>>> >>>>>> >>>>>> >>>>>> *But now the main problem is when the record at the source is >>>>>> deleted, it should be deleted in my final transformed record too.* >>>>>> >>>>>> >>>>>> If your final sync (storage) is data warehouse, it should be soft >>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp). >>>>>> >>>>>> >>>>>> >>>>>> HTH >>>>>> >>>>>> >>>>>> view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <flinkbyhe...@gmail.com> wrote: >>>>>> >>>>>>> I am using Spark incremental approach for bringing the latest data >>>>>>> everyday. Everything works fine. >>>>>>> >>>>>>> But now the main problem is when the record at the source is >>>>>>> deleted, it should be deleted in my final transformed record too. >>>>>>> >>>>>>> How do I capture such changes and change my table too ? >>>>>>> >>>>>>> Best regards, >>>>>>> Sid >>>>>>> >>>>>>>