Hi,

Just for sake of clarity can you please given the full statement for
reading the data from the largest table? I mean not the programmatic one
but the one which has the full statement in it.


Regards,
Gourav Sengupta




On Thu, Apr 12, 2018 at 7:19 AM, Jhon Anderson Cardenas Diaz <
jhonderson2...@gmail.com> wrote:

> Hi.
>
> On spark standalone i think you can not specify the number of workers
> machines to use but you can achieve that in this way:
> https://stackoverflow.com/questions/39399205/spark-
> standalone-number-executors-cores-control.
>
> For example, if you want that your jobs run on the 10 machines using all
> their cores (10 executors, each one in a different machine and with 40
> cores), you can use this configuration:
>
> spark.cores.max        = 400
> spark.executor.cores  = 40
>
> If you want more executors with less cores each one (lets say 20
> executors, each one with 20 cores):
>
> spark.cores.max        = 400
> spark.executor.cores  = 20
>
> Note that in the last case each worker machine will run two executors.
>
> In summary, use this trick:
>
> number-of-executors = spark.cores.max / spark.executor.cores.
>
> And have in mind that the executors will be divided among the available
> workers.
>
> Regards.
>
>
> 2018-04-11 21:39 GMT-05:00 宋源栋 <yuandong.s...@greatopensource.com>:
>
>> Hi
>>  1. Spark version : 2.3.0
>>  2. jdk: oracle jdk 1.8
>>  3. os version: centos 6.8
>>  4. spark-env.sh: null
>>  5. spark session config:
>>
>>
>> SparkSession.builder().appName("DBScale")
>>                 .config("spark.sql.crossJoin.enabled", "true")
>>                 .config("spark.sql.adaptive.enabled", "true")
>>                 .config("spark.scheduler.mode", "FAIR")
>>                 .config("spark.executor.memory", "1g")
>>                 .config("spark.executor.cores", 1)
>>                 .config("spark.driver.memory", "20")
>>                 .config("spark.serializer", 
>> "org.apache.spark.serializer.KryoSerializer")
>>                 .config("spark.executor.extraJavaOptions",
>>                         "-XX:+UseG1GC -XX:+PrintFlagsFinal 
>> -XX:+PrintReferenceGC " +
>>                                 "-verbose:gc -XX:+PrintGCDetails " +
>>                                 "-XX:+PrintGCTimeStamps 
>> -XX:+PrintAdaptiveSizePolicy")
>>                 .master(this.spark_master)
>>                 .getOrCreate();
>>
>>   6. core code:
>>
>>
>>          for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads 
>> data from mysql
>>             String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
>>             String[] pred = new String[tableInfo.partition_num];
>>             if (tableInfo.partition_num > 0) {
>>                 for (int j = 0; j < tableInfo.partition_num; j++) {
>>                     String str = "some where clause to split mysql table 
>> into many partitions";
>>                     pred[j] = str;
>>                 }
>>                 Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, 
>> connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
>>                 jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>>             } else {
>>                 logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
>>                 Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
>>                 jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>>             }
>>         }
>>
>>
>>         // Then run a query and write the result set to mysql
>>
>>         Dataset<Row> result = ss.sql(this.sql);
>>         result.explain(true);
>>         connProp.put("rewriteBatchedStatements", "true");
>>         connProp.put("sessionVariables", "sql_log_bin=off");
>>         result.write().jdbc(this.dst_url, this.dst_table, connProp);
>>
>>
>>
>> ------------------------------------------------------------------
>> 发件人:Jhon Anderson Cardenas Diaz <jhonderson2...@gmail.com>
>> 发送时间:2018年4月11日(星期三) 22:42
>> 收件人:宋源栋 <yuandong.s...@greatopensource.com>
>> 抄 送:user <user@spark.apache.org>
>> 主 题:Re: Spark is only using one worker machine when more are available
>>
>> Hi, could you please share the environment variables values that you are
>> sending when you run the jobs, spark version, etc.. more details.
>> Btw, you should take a look on SPARK_WORKER_INSTANCES and
>> SPARK_WORKER_CORES if you are using spark 2.0.0
>> <https://spark.apache.org/docs/preview/spark-standalone.html>.
>>
>> Regards.
>>
>> 2018-04-11 4:10 GMT-05:00 宋源栋 <yuandong.s...@greatopensource.com>:
>>
>>
>> Hi all,
>>
>> I hava a standalone mode spark cluster without HDFS with 10 machines that
>> each one has 40 cpu cores and 128G RAM.
>>
>> My application is a sparksql application that reads data from database
>> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
>> spark, I spilts the biggest table "lineitem" into 600 partitions.
>>
>> When my application runs, there are only 40
>> executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor
>> page of spark application web and all executors are on the same mathine. It
>> is too slowly that all tasks are parallelly running in only one mathine.
>>
>>
>>
>>
>>
>>
>

Reply via email to