Great! glad you got it working!

On Fri, Aug 14, 2020 at 6:46 PM tanu dua <[email protected]> wrote:

> Thanks Vinoth for detailed explanation and I was about to reply you that it
> worked and followed most of the steps that you mentioned below.
> Used forEachBatch() of stream to process the batch data from kafka and then
> finding out the partitions using aggregate functions on Kafka Dataset and
> then feed those partitions using Glob Pattern to Hudi to get hudiDs
> Then performed join on both Ds , I had some complex logic to deduce from
> both kafkaDs and hudiDs and hence using flatMap but I am now able to remove
> flatMap and could use Dataset joins.
>
> Thanks again for all your help as always !!
>
>
>
>
> On Thu, Aug 13, 2020 at 1:42 PM Vinoth Chandar <[email protected]> wrote:
>
> > Hi Tanuj,
> >
> > From this example, it appears as if you are trying to use sparkSession
> from
> > within the executor? This will be problematic. Can you please open a
> > support ticket with the full stack trace?
> >
> > I think what you are describing is a join between Kafka and Hudi tables.
> So
> > I'd read from Kafka first, cache the 2K messages in memory, find out what
> > partitions they belong to, and only load those affected partitions
> instead
> > of the entire table.
> > At this point, you will have two datasets : kafkaDF and hudiDF (or RDD or
> > DataSet.. my suggestion remains valid)
> > And instead of hand crafting the join at the record level, like you have.
> > you can just use RDD/DataSet level join operations and then get a
> resultDF
> >
> > then you do a resultDF.write.format("hudi") and you are done?
> >
> > On Tue, Aug 11, 2020 at 2:33 AM Tanuj <[email protected]> wrote:
> >
> > > Hi,
> > > I have a problem statement where I am consuming messages from Kafka and
> > > then depending upon that Kafka message (2K records) I need to query
> Hudi
> > > table and create a dataset (with both updates and inserts) and push
> them
> > > back to Hudi table.
> > >
> > > I tried following but it threw NP exception from sparkSession scala
> code
> > > and rightly so as sparkSession was used in Executor.
> > >
> > >  Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new
> > > FlatMapFunction<KafkaRecord, HudiRecord>() {
> > >             @Override
> > >             public Iterator<HudiRecord> call(KafkaRecord kafkaRecord)
> > > throws Exception {
> > >                 String prop1= kafkaRecord.getProp1();
> > >                 String prop2= kafkaRecord.getProp2();
> > >                 HudiRecord hudiRecord =  sparkSession.read()
> > >                         .format(HUDI_DATASOURCE)
> > >                         .schema(<schema>)
> > >                         .load(<path>)
> > >                         .as(Encoders.bean((HudiRecord.class)))
> > >                         .filter(<FILTER_ON_KAFKA_RECORD> say prop1);
> > >                 hudiRecord = tranform();
> > >                 // Modificiation in hudi record
> > >                 return Arrays.asList(kafkaRecord,
> hudiRecord).iterator();
> > >                 }
> > >
> > >             }
> > >         }, Encoders.bean(CompanyStatusGoldenRecord.class));
> > >
> > > In HUDI, I have 2 level of partitions (year and month) so for eg if I
> get
> > > 2K records from Kafka which will be spanned across multiple partitions
> -
> > > what is advisable load first the full table like "/*/*/*" or first read
> > > kafka record, find out which partitions need to be hit and then load
> only
> > > those HUDI tables as per partitions .I believe 2nd option would be
> faster
> > > i.e. loading the specific partitions and thats what I was trying in
> above
> > > snippet of code. So if have to leverage partitions, is collect() on
> Kafka
> > > Dataset to get the list of partitions  and then supply to HUDI is the
> > only
> > > option or I can do it just with the spark datasets ?
> > >
> >
>

Reply via email to