It depends a bit on the data as well, but have you investigated in SparkUI
which executor/task becomes slowly?
Could it be also the database from which you load data?
> Am 18.07.2020 um 17:00 schrieb Yong Yuan :
>
>
> The spark job has the correct functions and logic. However, after several
> hours running, it becomes slower and slower. Are there some pitfalls in the
> below code? Thanks!
>
>
> val query = "(select * from meta_table) as meta_data"
> val meta_schema = new StructType()
>.add("config_id", BooleanType)
>.add("threshold", LongType)
> var meta_df = spark.read.jdbc(url, query, connectionProperties)
> var meta_df_explode=meta_df.select(col("id"), from_json(col("config"),
> meta_schema).as("config")).select("config_id", "thresold", "config.*")
>
> //rules_imsi_df: joining of kafka ingestion with the meta_df_explode
>
> //rules_monitoring_df: static dataframe for monitoring purpose
>
> val rules_monitoring_stream =
> rules_imsi_df.writeStream
> .outputMode("append")
> .format("memory")
> .trigger(Trigger.ProcessingTime("120 seconds"))
> .foreachBatch {
> (batchDF: DataFrame, batchId: Long) =>
> if(!batchDF.isEmpty)
>{
>
> printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond,
> batchDF.count())
> batchDF.show()
> batchDF.persist()
> var batchDF_group =
> batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)",
> "total_volume_id")
> rules_monitoring_df = rules_monitoring_df.join(batchDF_group,
> rules_monitoring_df("id") === batchDF_group("id"),
> "left").select(rules_monitoring_df("id"),
> batchDF_group("total_volume_id")).na.fill(0)
> rules_monitoring_df = rules_monitoring_df.withColumn("volume",
> rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))
> batchDF.unpersist()
> }
> }.start()
>
>
> while(rules_monitoring_stream.isActive){
> Thread.sleep(24)
> ... //Periodically load meta data from database
> meta_df = spark.read.jdbc(url, query, connectionProperties)
> meta_df_explode=meta_df.select(col("id"), from_json(col("config"),
> meta_schema).as("config")).select("config_id", "thresold", "config.*")
>
> }
>
>
>
>
> In addition to the code, the yarn-sites.xml is configured as below.
>
> yarn.nodemanager.pmem-check-enabled, false
> yarn.nodemanager.localizer.cache.target-size-mb, 5120
> yarn.nodemanager.localizer.cache.cleanup.interval-ms, 40
> yarn.nodemanager.vmem-check-enabled, false
> yarn.nodemanager.disk-health-checker.enable,true
> yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
> yarn.log-aggregation.retain-seconds,36000
>
>
>
> The spark-submit command is as below.
>
> spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G
> --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab
> --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf
> -Djava.security.krb5.conf=./krb5.conf" --conf
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
> -Djava.security.krb5.conf=./krb5.conf" --conf
> "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
> -Djava.security.krb5.conf=./krb5.conf" --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
> sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
>
>
> I am running the job in AWS EMR with 2 m4.xlarge.
>
> Thanks!
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org