The spark job has the correct functions and logic. However, after several hours running, it becomes slower and slower. Are there some pitfalls in the below code? Thanks!
val query = "(select * from meta_table) as meta_data" val meta_schema = new StructType() .add("config_id", BooleanType) .add("threshold", LongType) var meta_df = spark.read.jdbc(url, query, connectionProperties) var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*") //rules_imsi_df: joining of kafka ingestion with the meta_df_explode //rules_monitoring_df: static dataframe for monitoring purpose val rules_monitoring_stream = rules_imsi_df.writeStream .outputMode("append") .format("memory") .trigger(Trigger.ProcessingTime("120 seconds")) .foreachBatch { (batchDF: DataFrame, batchId: Long) => if(!batchDF.isEmpty) { printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count()) batchDF.show() batchDF.persist() var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id") rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0) rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id")) batchDF.unpersist() } }.start() while(rules_monitoring_stream.isActive) { Thread.sleep(240000) ... //Periodically load meta data from database meta_df = spark.read.jdbc(url, query, connectionProperties) meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*") } In addition to the code, the yarn-sites.xml is configured as below. yarn.nodemanager.pmem-check-enabled, false yarn.nodemanager.localizer.cache.target-size-mb, 5120 yarn.nodemanager.localizer.cache.cleanup.interval-ms, 400000 yarn.nodemanager.vmem-check-enabled, false yarn.nodemanager.disk-health-checker.enable,true yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0 yarn.log-aggregation.retain-seconds,36000 The spark-submit command is as below. spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf" --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar I am running the job in AWS EMR with 2 m4.xlarge. Thanks!