Hi I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here).
So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming? Snippet code as following : // df_meta contains important information to join with the dataframe read from kafka val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load() df_meta.cache() val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", “x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000") .option("subscribe", "rawdb.raw_data") .option("failOnDataLoss", true) .option("startingOffsets", "latest") .load() .select($"value".as[Array[Byte]]) .map(avroDeserialize(_)) .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime") .join(df_meta.as("b"), $"a.apmac" === $"b.apmac”) df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR") .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR") .option("checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes")) .start("T_CF_TABLE") .awaitTermination() Mason