persistent tables in DataSource api V2

2020-07-18 Thread fansparker
1. In DataSource api V1, we were able to create persistent tables over custom
data sources using SQL DDL using "createRelation", "buildScan", "schema"
etc:. Is there a way to achieve this in DataSource api V2? 

2. In DataSource api V1, any schema changes in the underlying custom data
source is not reflected on the already persisted tables, even if the
"schema()" is re-invoked with the updated schema. Is there a way to get the
persisted table's schema updated? Thanks. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



subscribe

2020-07-18 Thread Piyush Acharya



Re: Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?

2020-07-18 Thread Jörn Franke
It depends a  bit on the data as well, but have you investigated in SparkUI 
which executor/task becomes slowly?

Could it be also the database from which you load data?

> Am 18.07.2020 um 17:00 schrieb Yong Yuan :
> 
> 
> The spark job has the correct functions and logic. However, after several 
> hours running, it becomes slower and slower. Are there some pitfalls in the 
> below code? Thanks!
> 
> 
> val query = "(select * from meta_table) as meta_data"
> val meta_schema = new StructType() 
>.add("config_id", BooleanType) 
>.add("threshold", LongType) 
> var meta_df = spark.read.jdbc(url, query, connectionProperties) 
> var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), 
> meta_schema).as("config")).select("config_id", "thresold", "config.*")  
> 
> //rules_imsi_df: joining of kafka ingestion with the meta_df_explode 
> 
> //rules_monitoring_df: static dataframe for monitoring purpose   
> 
> val rules_monitoring_stream =
> rules_imsi_df.writeStream   
> .outputMode("append")  
>   .format("memory")
> .trigger(Trigger.ProcessingTime("120  seconds"))
>  .foreachBatch {  
>   (batchDF: DataFrame, batchId: Long) =>
> if(!batchDF.isEmpty)  
>{
> 
> printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, 
> batchDF.count()) 
> batchDF.show()
>  batchDF.persist()
> var batchDF_group = 
> batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", 
> "total_volume_id")  
> rules_monitoring_df = rules_monitoring_df.join(batchDF_group, 
> rules_monitoring_df("id") === batchDF_group("id"), 
> "left").select(rules_monitoring_df("id"), 
> batchDF_group("total_volume_id")).na.fill(0) 
> rules_monitoring_df = rules_monitoring_df.withColumn("volume", 
> rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id")) 
>   batchDF.unpersist() 
>  }
>   }.start()
> 
> 
>   while(rules_monitoring_stream.isActive){  
> Thread.sleep(24)  
> ... //Periodically load meta data from database  
> meta_df = spark.read.jdbc(url, query, connectionProperties)  
> meta_df_explode=meta_df.select(col("id"), from_json(col("config"), 
> meta_schema).as("config")).select("config_id", "thresold", "config.*")
>  
> } 
> 
> 
> 
> 
> In addition to the code, the yarn-sites.xml is configured as below. 
> 
> yarn.nodemanager.pmem-check-enabled, false
> yarn.nodemanager.localizer.cache.target-size-mb, 5120
> yarn.nodemanager.localizer.cache.cleanup.interval-ms, 40
> yarn.nodemanager.vmem-check-enabled, false
> yarn.nodemanager.disk-health-checker.enable,true
> yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
> yarn.log-aggregation.retain-seconds,36000
> 
> 
> 
> The spark-submit command is as below. 
> 
> spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G 
> --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab 
> --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf 
> -Djava.security.krb5.conf=./krb5.conf" --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
>  -Djava.security.krb5.conf=./krb5.conf" --conf 
> "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
>  -Djava.security.krb5.conf=./krb5.conf"  --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 
> sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
> 
> 
> I am running the job in AWS EMR with 2 m4.xlarge. 
> 
> Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?

2020-07-18 Thread Yong Yuan
The spark job has the correct functions and logic. However, after several
hours running, it becomes slower and slower. Are there some pitfalls in the
below code? Thanks!


val query = "(select * from meta_table) as meta_data"
val meta_schema = new StructType()
   .add("config_id", BooleanType)
   .add("threshold", LongType)
var meta_df = spark.read.jdbc(url, query, connectionProperties)
var meta_df_explode=meta_df.select(col("id"), from_json(col("config"),
meta_schema).as("config")).select("config_id", "thresold", "config.*")

//rules_imsi_df: joining of kafka ingestion with the meta_df_explode

//rules_monitoring_df: static dataframe for monitoring purpose

val rules_monitoring_stream =
rules_imsi_df.writeStream
.outputMode("append")
  .format("memory")
.trigger(Trigger.ProcessingTime("120  seconds"))
 .foreachBatch {
  (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty)
  {

printf("At %d, the microbatch has %d records \n",
Instant.now.getEpochSecond, batchDF.count())

batchDF.show()
 batchDF.persist()
   var batchDF_group =
batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)",
"total_volume_id")
rules_monitoring_df = rules_monitoring_df.join(batchDF_group,
rules_monitoring_df("id") === batchDF_group("id"),
"left").select(rules_monitoring_df("id"),
batchDF_group("total_volume_id")).na.fill(0)
rules_monitoring_df = rules_monitoring_df.withColumn("volume",
rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))
batchDF.unpersist()
 }
 }.start()


  while(rules_monitoring_stream.isActive){
Thread.sleep(24)
... //Periodically load meta data from database
meta_df = spark.read.jdbc(url, query, connectionProperties)
meta_df_explode=meta_df.select(col("id"), from_json(col("config"),
meta_schema).as("config")).select("config_id", "thresold", "config.*")

}




In addition to the code, the yarn-sites.xml is configured as below.

yarn.nodemanager.pmem-check-enabled, false
yarn.nodemanager.localizer.cache.target-size-mb, 5120
yarn.nodemanager.localizer.cache.cleanup.interval-ms, 40
yarn.nodemanager.vmem-check-enabled, false
yarn.nodemanager.disk-health-checker.enable,true
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
yarn.log-aggregation.retain-seconds,36000



The spark-submit command is as below.

spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G
--executor-cores 2 --files
client_jaas.conf,cacerts,krb5.conf,service.keytab --driver-java-options
"-Djava.security.auth.login.config=./client_jaas.conf
-Djava.security.krb5.conf=./krb5.conf" --conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
-Djava.security.krb5.conf=./krb5.conf" --conf
"spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
-Djava.security.krb5.conf=./krb5.conf"  --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar


I am running the job in AWS EMR with 2 m4.xlarge.

Thanks!