Hi Tanuj,
>From this example, it appears as if you are trying to use sparkSession from
within the executor? This will be problematic. Can you please open a
support ticket with the full stack trace?
I think what you are describing is a join between Kafka and Hudi tables. So
I'd read from Kafka first, cache the 2K messages in memory, find out what
partitions they belong to, and only load those affected partitions instead
of the entire table.
At this point, you will have two datasets : kafkaDF and hudiDF (or RDD or
DataSet.. my suggestion remains valid)
And instead of hand crafting the join at the record level, like you have.
you can just use RDD/DataSet level join operations and then get a resultDF
then you do a resultDF.write.format("hudi") and you are done?
On Tue, Aug 11, 2020 at 2:33 AM Tanuj <[email protected]> wrote:
> Hi,
> I have a problem statement where I am consuming messages from Kafka and
> then depending upon that Kafka message (2K records) I need to query Hudi
> table and create a dataset (with both updates and inserts) and push them
> back to Hudi table.
>
> I tried following but it threw NP exception from sparkSession scala code
> and rightly so as sparkSession was used in Executor.
>
> Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new
> FlatMapFunction<KafkaRecord, HudiRecord>() {
> @Override
> public Iterator<HudiRecord> call(KafkaRecord kafkaRecord)
> throws Exception {
> String prop1= kafkaRecord.getProp1();
> String prop2= kafkaRecord.getProp2();
> HudiRecord hudiRecord = sparkSession.read()
> .format(HUDI_DATASOURCE)
> .schema(<schema>)
> .load(<path>)
> .as(Encoders.bean((HudiRecord.class)))
> .filter(<FILTER_ON_KAFKA_RECORD> say prop1);
> hudiRecord = tranform();
> // Modificiation in hudi record
> return Arrays.asList(kafkaRecord, hudiRecord).iterator();
> }
>
> }
> }, Encoders.bean(CompanyStatusGoldenRecord.class));
>
> In HUDI, I have 2 level of partitions (year and month) so for eg if I get
> 2K records from Kafka which will be spanned across multiple partitions -
> what is advisable load first the full table like "/*/*/*" or first read
> kafka record, find out which partitions need to be hit and then load only
> those HUDI tables as per partitions .I believe 2nd option would be faster
> i.e. loading the specific partitions and thats what I was trying in above
> snippet of code. So if have to leverage partitions, is collect() on Kafka
> Dataset to get the list of partitions and then supply to HUDI is the only
> option or I can do it just with the spark datasets ?
>