I need it cached to improve throughput ,only hope it can be refreshed once a day not every batch.
> On Nov 13, 2017, at 4:49 PM, Burak Yavuz <brk...@gmail.com> wrote: > > I think if you don't cache the jdbc table, then it should auto-refresh. > > On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <spark.recei...@gmail.com > <mailto:spark.recei...@gmail.com>> wrote: > Hi > > I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works great. > The thing is I need to join the Kafka message with a relative static table > stored in mysql database (let’s call it metadata here). > > So is it possible to reload the metadata table after some time interval(like > daily ) without restart running struct streaming? > > Snippet code as following : > // df_meta contains important information to join with the dataframe read > from kafka > val df_meta = spark.read.format("jdbc").option("url", > mysql_url).option("dbtable", "v_entity_ap_rel").load() > df_meta.cache() > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", > “x.x.x.x:9092").option("fetch.message.max.bytes", > "50000000").option("kafka.max.partition.fetch.bytes", "50000000") > .option("subscribe", "rawdb.raw_data") > .option("failOnDataLoss", true) > .option("startingOffsets", "latest") > .load() > .select($"value".as[Array[Byte]]) > .map(avroDeserialize(_)) > .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime") > .join(df_meta.as <http://df_meta.as/>("b"), $"a.apmac" === $"b.apmac”) > > df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi > then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", > "substring(stime,1,13) STIME_HOUR") > .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR") > .option("checkpointLocation", > "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes")) > .start("T_CF_TABLE") > .awaitTermination() > > Mason >