Hi 1. Spark version : 2.3.0 2. jdk: oracle jdk 1.8 3. os version: centos 6.8 4. 
spark-env.sh: null 5. spark session config:    
SparkSession.builder().appName("DBScale")
                .config("spark.sql.crossJoin.enabled", "true")
                .config("spark.sql.adaptive.enabled", "true")
                .config("spark.scheduler.mode", "FAIR")
                .config("spark.executor.memory", "1g")
                .config("spark.executor.cores", 1)
                .config("spark.driver.memory", "20")
                .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                .config("spark.executor.extraJavaOptions",
                        "-XX:+UseG1GC -XX:+PrintFlagsFinal 
-XX:+PrintReferenceGC " +
                                "-verbose:gc -XX:+PrintGCDetails " +
                                "-XX:+PrintGCTimeStamps 
-XX:+PrintAdaptiveSizePolicy")
                .master(this.spark_master)
                .getOrCreate();  6. core code:             for 
(SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
            String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
            String[] pred = new String[tableInfo.partition_num];
            if (tableInfo.partition_num > 0) {
                for (int j = 0; j < tableInfo.partition_num; j++) {
                    String str = "some where clause to split mysql table into 
many partitions";
                    pred[j] = str;
                }
                Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, 
connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
                jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
            } else {
                logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
                Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
                jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
            }
        }
        // Then run a query and write the result set to mysql

        Dataset<Row> result = ss.sql(this.sql);
        result.explain(true);
        connProp.put("rewriteBatchedStatements", "true");
        connProp.put("sessionVariables", "sql_log_bin=off");
        result.write().jdbc(this.dst_url, this.dst_table, connProp);

------------------------------------------------------------------发件人:Jhon 
Anderson Cardenas Diaz <jhonderson2...@gmail.com>发送时间:2018年4月11日(星期三) 
22:42收件人:宋源栋 <yuandong.s...@greatopensource.com>抄 送:user 
<user@spark.apache.org>主 题:Re: Spark is only using one worker machine when more 
are available
Hi, could you please share the environment variables values that you are 
sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if 
you are using spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <yuandong.s...@greatopensource.com>:


Hi all,
I hava a standalone mode spark cluster without HDFS with 10 machines that each 
one has 40 cpu cores and 128G RAM.
My application is a sparksql application that reads data from database 
"tpch_100g" in mysql and run tpch queries. When loading tables from myql to 
spark, I spilts the biggest table "lineitem" into 600 partitions. 

When my application runs, there are only 40 executor(spark.executor.memory = 
1g, spark.executor.cores = 1) in executor page of spark application web and all 
executors are on the same mathine. It is too slowly that all tasks are 
parallelly running in only one mathine.





Reply via email to