The spark job has the correct functions and logic. However, after several
hours running, it becomes slower and slower. Are there some pitfalls in the
below code? Thanks!


val query = "(select * from meta_table) as meta_data"
val meta_schema = new StructType()
       .add("config_id", BooleanType)
       .add("threshold", LongType)
var meta_df = spark.read.jdbc(url, query, connectionProperties)
var meta_df_explode=meta_df.select(col("id"), from_json(col("config"),
meta_schema).as("config")).select("config_id", "thresold", "config.*")

//rules_imsi_df: joining of kafka ingestion with the meta_df_explode

//rules_monitoring_df: static dataframe for monitoring purpose

val rules_monitoring_stream =
            rules_imsi_df.writeStream
                        .outputMode("append")
                          .format("memory")
                        .trigger(Trigger.ProcessingTime("120  seconds"))
                         .foreachBatch {
                              (batchDF: DataFrame, batchId: Long) =>
                                    if(!batchDF.isEmpty)
                                  {

printf("At %d, the microbatch has %d records \n",
Instant.now.getEpochSecond, batchDF.count())

batchDF.show()
 batchDF.persist()
       var batchDF_group =
batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)",
"total_volume_id")
rules_monitoring_df = rules_monitoring_df.join(batchDF_group,
rules_monitoring_df("id") === batchDF_group("id"),
"left").select(rules_monitoring_df("id"),
batchDF_group("total_volume_id")).na.fill(0)
rules_monitoring_df = rules_monitoring_df.withColumn("volume",
rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))
                                                        batchDF.unpersist()
                                                                         }
                                         }.start()


      while(rules_monitoring_stream.isActive)    {
Thread.sleep(240000)
... //Periodically load meta data from database
meta_df = spark.read.jdbc(url, query, connectionProperties)
meta_df_explode=meta_df.select(col("id"), from_json(col("config"),
meta_schema).as("config")).select("config_id", "thresold", "config.*")

}




In addition to the code, the yarn-sites.xml is configured as below.

yarn.nodemanager.pmem-check-enabled, false
yarn.nodemanager.localizer.cache.target-size-mb, 5120
yarn.nodemanager.localizer.cache.cleanup.interval-ms, 400000
yarn.nodemanager.vmem-check-enabled, false
yarn.nodemanager.disk-health-checker.enable,true
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
yarn.log-aggregation.retain-seconds,36000



The spark-submit command is as below.

spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G
--executor-cores 2 --files
client_jaas.conf,cacerts,krb5.conf,service.keytab --driver-java-options
"-Djava.security.auth.login.config=./client_jaas.conf
-Djava.security.krb5.conf=./krb5.conf" --conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
-Djava.security.krb5.conf=./krb5.conf" --conf
"spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
-Djava.security.krb5.conf=./krb5.conf"  --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar


I am running the job in AWS EMR with 2 m4.xlarge.

Thanks!

Reply via email to