Great! glad you got it working! On Fri, Aug 14, 2020 at 6:46 PM tanu dua <[email protected]> wrote:
> Thanks Vinoth for detailed explanation and I was about to reply you that it > worked and followed most of the steps that you mentioned below. > Used forEachBatch() of stream to process the batch data from kafka and then > finding out the partitions using aggregate functions on Kafka Dataset and > then feed those partitions using Glob Pattern to Hudi to get hudiDs > Then performed join on both Ds , I had some complex logic to deduce from > both kafkaDs and hudiDs and hence using flatMap but I am now able to remove > flatMap and could use Dataset joins. > > Thanks again for all your help as always !! > > > > > On Thu, Aug 13, 2020 at 1:42 PM Vinoth Chandar <[email protected]> wrote: > > > Hi Tanuj, > > > > From this example, it appears as if you are trying to use sparkSession > from > > within the executor? This will be problematic. Can you please open a > > support ticket with the full stack trace? > > > > I think what you are describing is a join between Kafka and Hudi tables. > So > > I'd read from Kafka first, cache the 2K messages in memory, find out what > > partitions they belong to, and only load those affected partitions > instead > > of the entire table. > > At this point, you will have two datasets : kafkaDF and hudiDF (or RDD or > > DataSet.. my suggestion remains valid) > > And instead of hand crafting the join at the record level, like you have. > > you can just use RDD/DataSet level join operations and then get a > resultDF > > > > then you do a resultDF.write.format("hudi") and you are done? > > > > On Tue, Aug 11, 2020 at 2:33 AM Tanuj <[email protected]> wrote: > > > > > Hi, > > > I have a problem statement where I am consuming messages from Kafka and > > > then depending upon that Kafka message (2K records) I need to query > Hudi > > > table and create a dataset (with both updates and inserts) and push > them > > > back to Hudi table. > > > > > > I tried following but it threw NP exception from sparkSession scala > code > > > and rightly so as sparkSession was used in Executor. > > > > > > Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new > > > FlatMapFunction<KafkaRecord, HudiRecord>() { > > > @Override > > > public Iterator<HudiRecord> call(KafkaRecord kafkaRecord) > > > throws Exception { > > > String prop1= kafkaRecord.getProp1(); > > > String prop2= kafkaRecord.getProp2(); > > > HudiRecord hudiRecord = sparkSession.read() > > > .format(HUDI_DATASOURCE) > > > .schema(<schema>) > > > .load(<path>) > > > .as(Encoders.bean((HudiRecord.class))) > > > .filter(<FILTER_ON_KAFKA_RECORD> say prop1); > > > hudiRecord = tranform(); > > > // Modificiation in hudi record > > > return Arrays.asList(kafkaRecord, > hudiRecord).iterator(); > > > } > > > > > > } > > > }, Encoders.bean(CompanyStatusGoldenRecord.class)); > > > > > > In HUDI, I have 2 level of partitions (year and month) so for eg if I > get > > > 2K records from Kafka which will be spanned across multiple partitions > - > > > what is advisable load first the full table like "/*/*/*" or first read > > > kafka record, find out which partitions need to be hit and then load > only > > > those HUDI tables as per partitions .I believe 2nd option would be > faster > > > i.e. loading the specific partitions and thats what I was trying in > above > > > snippet of code. So if have to leverage partitions, is collect() on > Kafka > > > Dataset to get the list of partitions and then supply to HUDI is the > > only > > > option or I can do it just with the spark datasets ? > > > > > >
