Spark - Hadoop custom filesystem service loading

2019-03-18 Thread Jhon Anderson Cardenas Diaz
Hi everyone,

On spark 2.2.0, if you wanted to create a custom file system
implementation, you just created an extension of
org.apache.hadoop.fs.FileSystem and put the canonical name of the custom
class on the file
src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem.

Once you imported that jar dependency on your spark submit application, the
custom schema was automatically loaded, and you could start to use it just
like ds.load("customfs://path").

But on spark 2.4.0 that does not seem to work the same. If you do exactly
the same you will get an error like "No FileSystem for customfs".

The only way I achieved this on 2.4.0, was specifying the spark property
spark.hadoop.fs.customfs.impl.

Do you guys consider this as a bug? or is it an intentional change that
should be documented on somewhere?

Btw, digging a little bit on this, it seems that the cause is that now the
FileSystem is initialized before the actual dependencies are downloaded
from Maven repo (see here
).
And as that initialization loads the available filesystems at that point
and only once, the filesystems in the jars downloaded are not taken in
account.

Thanks.


Re: Spark is only using one worker machine when more are available

2018-04-12 Thread Jhon Anderson Cardenas Diaz
Hi.

On spark standalone i think you can not specify the number of workers
machines to use but you can achieve that in this way:
https://stackoverflow.com/questions/39399205/spark-standalone-number-executors-cores-control
.

For example, if you want that your jobs run on the 10 machines using all
their cores (10 executors, each one in a different machine and with 40
cores), you can use this configuration:

spark.cores.max= 400
spark.executor.cores  = 40

If you want more executors with less cores each one (lets say 20 executors,
each one with 20 cores):

spark.cores.max= 400
spark.executor.cores  = 20

Note that in the last case each worker machine will run two executors.

In summary, use this trick:

number-of-executors = spark.cores.max / spark.executor.cores.

And have in mind that the executors will be divided among the available
workers.

Regards.


2018-04-11 21:39 GMT-05:00 宋源栋 :

> Hi
>  1. Spark version : 2.3.0
>  2. jdk: oracle jdk 1.8
>  3. os version: centos 6.8
>  4. spark-env.sh: null
>  5. spark session config:
>
>
> SparkSession.builder().appName("DBScale")
> .config("spark.sql.crossJoin.enabled", "true")
> .config("spark.sql.adaptive.enabled", "true")
> .config("spark.scheduler.mode", "FAIR")
> .config("spark.executor.memory", "1g")
> .config("spark.executor.cores", 1)
> .config("spark.driver.memory", "20")
> .config("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> .config("spark.executor.extraJavaOptions",
> "-XX:+UseG1GC -XX:+PrintFlagsFinal 
> -XX:+PrintReferenceGC " +
> "-verbose:gc -XX:+PrintGCDetails " +
> "-XX:+PrintGCTimeStamps 
> -XX:+PrintAdaptiveSizePolicy")
> .master(this.spark_master)
> .getOrCreate();
>
>   6. core code:
>
>
>  for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads 
> data from mysql
> String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
> String[] pred = new String[tableInfo.partition_num];
> if (tableInfo.partition_num > 0) {
> for (int j = 0; j < tableInfo.partition_num; j++) {
> String str = "some where clause to split mysql table into 
> many partitions";
> pred[j] = str;
> }
> Dataset jdbcDF = ss.read().jdbc(this.url, dt, pred, 
> connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:)
> jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
> } else {
> logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
> Dataset jdbcDF = ss.read().jdbc(this.url, dt, connProp);
> jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
> }
> }
>
>
> // Then run a query and write the result set to mysql
>
> Dataset result = ss.sql(this.sql);
> result.explain(true);
> connProp.put("rewriteBatchedStatements", "true");
> connProp.put("sessionVariables", "sql_log_bin=off");
> result.write().jdbc(this.dst_url, this.dst_table, connProp);
>
>
>
> --
> 发件人:Jhon Anderson Cardenas Diaz 
> 发送时间:2018年4月11日(星期三) 22:42
> 收件人:宋源栋 
> 抄 送:user 
> 主 题:Re: Spark is only using one worker machine when more are available
>
> Hi, could you please share the environment variables values that you are
> sending when you run the jobs, spark version, etc.. more details.
> Btw, you should take a look on SPARK_WORKER_INSTANCES and
> SPARK_WORKER_CORES if you are using spark 2.0.0
> <https://spark.apache.org/docs/preview/spark-standalone.html>.
>
> Regards.
>
> 2018-04-11 4:10 GMT-05:00 宋源栋 :
>
>
> Hi all,
>
> I hava a standalone mode spark cluster without HDFS with 10 machines that
> each one has 40 cpu cores and 128G RAM.
>
> My application is a sparksql application that reads data from database
> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
> spark, I spilts the biggest table "lineitem" into 600 partitions.
>
> When my application runs, there are only 40 executor(spark.executor.memory
> = 1g, spark.executor.cores = 1) in executor page of spark application web
> and all executors are on the same mathine. It is too slowly that all tasks
> are parallelly running in only one mathine.
>
>
>
>
>
>


Re: Spark is only using one worker machine when more are available

2018-04-11 Thread Jhon Anderson Cardenas Diaz
Hi, could you please share the environment variables values that you are
sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES
if you are using spark 2.0.0
.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 :

>
>
> Hi all,
>
> I hava a standalone mode spark cluster without HDFS with 10 machines that
> each one has 40 cpu cores and 128G RAM.
>
> My application is a sparksql application that reads data from database
> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
> spark, I spilts the biggest table "lineitem" into 600 partitions.
>
> When my application runs, there are only 40 executor(spark.executor.memory
> = 1g, spark.executor.cores = 1) in executor page of spark application web
> and all executors are on the same mathine. It is too slowly that all tasks
> are parallelly running in only one mathine.
>
>
>
>


How to create security filter for Spark UI in Spark on YARN

2018-01-09 Thread Jhon Anderson Cardenas Diaz
*Environment*:
AWS EMR, yarn cluster.

*Description*:
I am trying to use a java filter to protect the access to spark ui, this by
using the property spark.ui.filters; the problem is that when spark is
running on yarn mode, that property is being allways overriden by hadoop
with the filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter:

*spark.ui.filters:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter*

And this properties are automatically added:


*spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS:
ip-x-x-x-226.eu-west-1.compute.internalspark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES:
http://ip-x-x-x-226.eu-west-1.compute.internal:20888/proxy/application_x_
*

Any suggestion of how to add a java security filter so hadoop does not
override it, or maybe how to configure the security from hadoop side?

Thanks.


Spark UI stdout/stderr links point to executors internal address

2018-01-09 Thread Jhon Anderson Cardenas Diaz
*Environment:*

AWS EMR, yarn cluster.



*Description:*

On Spark ui, in Environment and Executors tabs, the links of stdout and
stderr point to the internal address of the executors. This would imply to
expose the executors so that links can be accessed. Shouldn't those links
be pointed to master then handled internally serving the master as a proxy
for these files instead of exposing the internal machines?



I have tried setting SPARK_PUBLIC_DNS and SPARK_LOCAL_IP variables so they
contain the master ip address. I also tried with this properties:
spark.yarn.appMasterEnv.SPARK_LOCAL_IP and
spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS but it does not seem to work.


Any suggestion?
​