Re: Where does the Driver run?

2019-03-23 Thread Akhil Das
If you are starting your "my-app" on your local machine, that's where the
driver is running.

[image: image.png]

Hope this helps.


On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:

> I have researched this for a significant amount of time and find answers
> that seem to be for a slightly different question than mine.
>
> The Spark 2.3.3 cluster is running fine. I see the GUI on “
> http://master-address:8080;, there are 2 idle workers, as configured.
>
> I have a Scala application that creates a context and starts execution of
> a Job. I *do not use spark-submit*, I start the Job programmatically and
> this is where many explanations forks from my question.
>
> In "my-app" I create a new SparkConf, with the following code (slightly
> abbreviated):
>
>   conf.setAppName(“my-job")
>   conf.setMaster(“spark://master-address:7077”)
>   conf.set(“deployMode”, “cluster”)
>   // other settings like driver and executor memory requests
>   // the driver and executor memory requests are for all mem on the
> slaves, more than
>   // mem available on the launching machine with “my-app"
>   val jars = listJars(“/path/to/lib")
>   conf.setJars(jars)
>   …
>
> When I launch the job I see 2 executors running on the 2 workers/slaves.
> Everything seems to run fine and sometimes completes successfully. Frequent
> failures are the reason for this question.
>
> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
> taking all cluster resources. With a Yarn cluster I would expect the
> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
> Master, where is the Drive part of the Job running?
>
> If is is running in the Master, we are in trouble because I start the
> Master on one of my 2 Workers sharing resources with one of the Executors.
> Executor mem + driver mem is > available mem on a Worker. I can change this
> but need so understand where the Driver part of the Spark Job runs. Is it
> in the Spark Master, or inside and Executor, or ???
>
> The “Driver” creates and broadcasts some large data structures so the need
> for an answer is more critical than with more typical tiny Drivers.
>
> Thanks for you help!
>


-- 
Cheers!


Re: Spark - Hadoop custom filesystem service loading

2019-03-23 Thread Felix Cheung
Hmm thanks. Do you have a proposed solution?



From: Jhon Anderson Cardenas Diaz 
Sent: Monday, March 18, 2019 1:24 PM
To: user
Subject: Spark - Hadoop custom filesystem service loading

Hi everyone,

On spark 2.2.0, if you wanted to create a custom file system implementation, 
you just created an extension of org.apache.hadoop.fs.FileSystem and put the 
canonical name of the custom class on the file 
src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem.

Once you imported that jar dependency on your spark submit application, the 
custom schema was automatically loaded, and you could start to use it just like 
ds.load("customfs://path").

But on spark 2.4.0 that does not seem to work the same. If you do exactly the 
same you will get an error like "No FileSystem for customfs".

The only way I achieved this on 2.4.0, was specifying the spark property 
spark.hadoop.fs.customfs.impl.

Do you guys consider this as a bug? or is it an intentional change that should 
be documented on somewhere?

Btw, digging a little bit on this, it seems that the cause is that now the 
FileSystem is initialized before the actual dependencies are downloaded from 
Maven repo (see 
here).
 And as that initialization loads the available filesystems at that point and 
only once, the filesystems in the jars downloaded are not taken in account.

Thanks.


Apache Spark Newsletter Issue 2

2019-03-23 Thread Ankur Gupta
Hello,

Issue two of the newsletter

https://newsletterspot.com/apache-spark/2/

Feel free to submit articles to the newsletter
https://newsletterspot.com/apache-spark/submit/

Next issue onwards will be adding


  *   Spark Events / User Meetups
  *   Tags to identifying content e.g. videos, GitHub projects etc.
  *   Being regular 

Thanks
Ankur
Sent from Mail for Windows 10



Where does the Driver run?

2019-03-23 Thread Pat Ferrel
I have researched this for a significant amount of time and find answers
that seem to be for a slightly different question than mine.

The Spark 2.3.3 cluster is running fine. I see the GUI on “
http://master-address:8080;, there are 2 idle workers, as configured.

I have a Scala application that creates a context and starts execution of a
Job. I *do not use spark-submit*, I start the Job programmatically and this
is where many explanations forks from my question.

In "my-app" I create a new SparkConf, with the following code (slightly
abbreviated):

  conf.setAppName(“my-job")
  conf.setMaster(“spark://master-address:7077”)
  conf.set(“deployMode”, “cluster”)
  // other settings like driver and executor memory requests
  // the driver and executor memory requests are for all mem on the
slaves, more than
  // mem available on the launching machine with “my-app"
  val jars = listJars(“/path/to/lib")
  conf.setJars(jars)
  …

When I launch the job I see 2 executors running on the 2 workers/slaves.
Everything seems to run fine and sometimes completes successfully. Frequent
failures are the reason for this question.

Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
taking all cluster resources. With a Yarn cluster I would expect the
“Driver" to run on/in the Yarn Master but I am using the Spark Standalone
Master, where is the Drive part of the Job running?

If is is running in the Master, we are in trouble because I start the
Master on one of my 2 Workers sharing resources with one of the Executors.
Executor mem + driver mem is > available mem on a Worker. I can change this
but need so understand where the Driver part of the Spark Job runs. Is it
in the Spark Master, or inside and Executor, or ???

The “Driver” creates and broadcasts some large data structures so the need
for an answer is more critical than with more typical tiny Drivers.

Thanks for you help!


JavaRDD and WrappedArrays type iterate

2019-03-23 Thread 1266
Hi everyone ,
I have encountered some problems when using spark,
I have encountered a type of WrappedArray(), ​I don't know how to 
iterate over this type, or how to convert this type to a List or an Array?


Also, after I convert the dataframe type data into JavaRDD, the data 
type is JavaRDD[Row]. I don't know how to traverse this JavaRDD 
type. I use foreach(x=>println(x)) for output, but throw 
exception::30: error: missing parameter type


If anyone knows how to solve this, I am really grateful. After all, 
these problems have bothered me for the whole two days.






​



 Warm regards,
 Sanders Wang

spark core / spark sql -- unexpected disk IO activity after all the spark tasks finished but spark context has not stopped.

2019-03-23 Thread Chenghao
Hi, 

I detected an unexpected disk IO (DISKBUSY spike) after all my spark tasks
finished but spark context has not stopped -- as shown in figure case 2 at
21:56:47. Could anyone help explain it and give suggestions on how to avoid
or postpone it? Or does spark context have some periodical async IO
activities that might lead to the spikes? Thanks!

Given an example of running a SparkSQL batch job in two cases. In the first
one, I execute the sql workload, and stop spark context immediately after
`.show()` action finishes. In the second case, I add a 1-minute sleep after
`.show()` by using `Thread.sleep(6)`, then stop the spark context. The
result shows that the time costs for executing the sql workload in two cases
are similar, but there is an unexpected DISKBUSY spike on the disk who is
doing local storage for shuffle write in the second case. See the spike in
the figure of case 2.

Here are more details.

System Background: 
1. Spark 2.3.1, Hadoop 2.9.1, Hive 2.3.4 for metadata storage. 
2. One master and two worker nodes. Each node has enough available resources
(32 cores, 750G memory and 8 8-T disks from disk1 to disk8).
3. The HDFS is deployed on disk8; the disk1 is used for the spark shuffle
writing local storage.
4. I use Yarn client mode for resource management.
5. I use a system monitor tool "nmon" to detect the disk activities.
6. There is no other big application running in the backend.

Current Analysis:
1. The spike is not caused by the disk itself and other background
processes. I tried disk2, disk3, disk4, and disk8 for yarn local storage for
testing whether the spike is related to the program and the answer is yes.
It shows the same spikes every time I executed case 2.
2. The spike is caused by Spark itself. I tried the standalone deploy mode
and the spike still exists.
3. It might be relevant to shuffle size. The total shuffle writing size of
the target batch job is close to 2GB. Different workloads with shuffle
writing size close to 1MB, 250MB, and 1GB are also tried. The DISKBUSY
becomes negligible for the batch job with shuffling write size 1MB and
becomes up to 80% for the batch job with the total shuffling write size
250MB.
4. The disk spike might be for disk swap. The size of the local storage file
is traced. When disk spike appears, disk writing is detected but the disk
size does not increase -- So it might be doing some disk swap? 

Case 1:

 
Case 2:

 

To be more clear for the figures, the /worker1 node local/ and /worker2 node
local/ stand for the disk1 in worker1 and worker2 resp.; the /worker1 node
dfs/ and /worker2 node dfs/ stand for the disk8 in worker1 and worker2
resp., where HDFS locates. The left y-axis is the diskbusy (from 0% to 100%)
detected by nmon and the right y-axis is the size of the directory for hdfs
in disk8 (which we can just ignore for this problem).

Here is the code for the workload.

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"

val spark = SparkSession
  .builder()
  .enableHiveSupport()
  .getOrCreate()
val sc = spark.sparkContext

spark.sql(s"use $db")

val t1 = System.currentTimeMillis()
spark.sql(
  s"""
 |SELECT w_state, i_item_id,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') <
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_before,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') >=
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_after
 |FROM (
 |  SELECT *
 |  FROM web_sales ws
 |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number =
wr.wr_order_number
 |AND ws.ws_item_sk = wr.wr_item_sk)
 |) a1
 |JOIN item i ON a1.ws_item_sk = i.i_item_sk
 |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
 |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
 |AND unix_timestamp(d.d_date, '-MM-dd') >=
unix_timestamp('2001-03-16', '-MM-dd') - 30*24*60*60 --subtract 30 days
in seconds
 |AND unix_timestamp(d.d_date, '-MM-dd') <=
unix_timestamp('2001-03-16', '-MM-dd') + 30*24*60*60 --add 30 days in
seconds
 |GROUP BY w_state,i_item_id
 |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is
hives cluster scale counter part
 |ORDER BY w_state,i_item_id
 |LIMIT 100
   """.stripMargin).show
val t2 = System.currentTimeMillis()

//For case 2
//Thread.sleep(60 * 1000)

spark.stop()
  }
}



--
Sent from: 

[spark context / spark sql] unexpected disk IO activity after spark job finished but spark context has not

2019-03-23 Thread Chenghao
Hi, 

I have a SparkSQL workload and ran it as a batch job in two cases. In the
first case, I execute the workload, and stop the batch job after `.show()`
finished. In the second case, I executed the same workload, and called a
1-minute sleep `Thread.sleep(6)` before I stop its spark context and the
batch job. The time costs for the workload in two cases are similar but I
detected an unexpected DISKBUSY spike on the spark local file in the second
case by using a system monitor tool "nmon" as shown in the case 2 figure.

Could anyone help explain the reason of the disk spike and how to avoid it?
Does spark context have some periodical async IO activities that lead to the
spikes?

System Background: 
1. Spark 2.3.1, Hadoop 2.9.1, Hive 2.3.4 for metadata storage. 
2. One master and two worker nodes. Each node has enough available resources
(32 cores, 750G memory and 8 8-T disks from disk1 to disk8).
3. The HDFS is deployed on disk8; the disk1 is used for the spark shuffle
writing local storage.
4. I use Yarn client mode for resource management.
5. There is no other big application running in the backend.

Current Analysis:
1. The spike is not caused by the disk itself and other background
processes. I tried disk2, disk3, disk4, and disk8 for yarn local storage for
testing whether the spike is related to the program and the answer is yes.
It shows the same spikes every time I executed case 2.
2. The spike is caused by Spark itself. I tried the standalone deploy mode
and the spike still exists.
3. It might be relevant to shuffle size. The total shuffle writing size of
the target batch job is close to 2GB. Different workloads with shuffle
writing size close to 1MB, 250MB, and 1GB are also tried. The DISKBUSY
becomes negligible for the batch job with shuffling write size 1MB and
becomes up to 80% for the batch job with the total shuffling write size
250MB.
4. The disk spike might be for disk swap. The size of the local storage file
is traced. When disk spike appears, disk writing is detected but the disk
size does not increase -- So it might be doing some disk swap? 

Case 1:

 
Case 2:

 

To be more clear for the figures, the /worker1 node local/ and /worker2 node
local/ stand for the disk1 in worker1 and worker2 resp.; the /worker1 node
dfs/ and /worker2 node dfs/ stand for the disk8 in worker1 and worker2
resp., where HDFS locates. The left y-axis is the diskbusy (from 0% to 100%)
detected by nmon and the right y-axis is the size of the directory for hdfs
in disk8 (which we can just ignore for this problem).

Here is the code for the workload.

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"

val spark = SparkSession
  .builder()
  .enableHiveSupport()
  .getOrCreate()
val sc = spark.sparkContext

spark.sql(s"use $db")

val t1 = System.currentTimeMillis()
spark.sql(
  s"""
 |SELECT w_state, i_item_id,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') <
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_before,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') >=
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_after
 |FROM (
 |  SELECT *
 |  FROM web_sales ws
 |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number =
wr.wr_order_number
 |AND ws.ws_item_sk = wr.wr_item_sk)
 |) a1
 |JOIN item i ON a1.ws_item_sk = i.i_item_sk
 |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
 |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
 |AND unix_timestamp(d.d_date, '-MM-dd') >=
unix_timestamp('2001-03-16', '-MM-dd') - 30*24*60*60 --subtract 30 days
in seconds
 |AND unix_timestamp(d.d_date, '-MM-dd') <=
unix_timestamp('2001-03-16', '-MM-dd') + 30*24*60*60 --add 30 days in
seconds
 |GROUP BY w_state,i_item_id
 |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is
hives cluster scale counter part
 |ORDER BY w_state,i_item_id
 |LIMIT 100
   """.stripMargin).show
val t2 = System.currentTimeMillis()

//For case 2
//Thread.sleep(60 * 1000)

spark.stop()
  }
}



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[spark context / spark sql] unexpected disk IO activity after spark job finished but spark context has not

2019-03-23 Thread Chenghao
Hi, 

I have a SparkSQL workload and ran it as a batch job in two cases. In the
first case, I execute the workload, and stop the batch job after `.show()`
finished. In the second case, I executed the same workload, and called a
1-minute sleep `Thread.sleep(6)` before I stop its spark context and the
batch job. The time costs for the workload in two cases are similar but I
detected an unexpected DISKBUSY spike on the spark local file in the second
case by using a system monitor tool "nmon" as shown in my second figure.

Could anyone help explain the reason of the disk spike and how to avoid it?
Does spark context have some periodical async IO activities that lead to the
spikes?

System Background: 
1. Spark 2.3.1, Hadoop 2.9.1, Hive 2.3.4 for metadata storage. 
2. One master and two worker nodes. Each node has enough available resources
(32 cores, 750G memory and 8 8-T disks from disk1 to disk8).
3. The HDFS is deployed on disk8; the disk1 is used for the spark shuffle
writing local storage.
4. I use Yarn client mode for resource management.
5. There is no other big application running in the backend.

Current Analysis:
1. The spike is not caused by the disk itself and other background
processes. I tried disk2, disk3, disk4, and disk8 for yarn local storage for
testing whether the spike is related to the program and the answer is yes.
It shows the same spikes every time I executed case 2.
2. The spike is caused by Spark itself. I tried the standalone deploy mode
and the spike still exists.
3. It might be relevant to shuffle size. The total shuffle writing size of
the target batch job is close to 2GB. Different workloads with shuffle
writing size close to 1MB, 250MB, and 1GB are also tried. The DISKBUSY
becomes negligible for the batch job with shuffling write size 1MB and
becomes up to 80% for the batch job with the total shuffling write size
250MB.
4. The disk spike might be for disk swap. The size of the local storage file
is traced. When disk spike appears, disk writing is detected but the disk
size does not increase -- So it might be doing some disk swap? 

Case 1:

 
Case 2:

 

To be more clear for the figures, the /worker1 node local/ and /worker2 node
local/ stand for the disk1 in worker1 and worker2 resp.; the /worker1 node
dfs/ and /worker2 node dfs/ stand for the disk8 in worker1 and worker2
resp., where HDFS locates. The left y-axis is the diskbusy (from 0% to 100%)
detected by nmon and the right y-axis is the size of the directory for hdfs
in disk8 (which we can just ignore for this problem).

Here is the code for the workload.

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"

val spark = SparkSession
  .builder()
  .enableHiveSupport()
  .getOrCreate()
val sc = spark.sparkContext

spark.sql(s"use $db")

val t1 = System.currentTimeMillis()
spark.sql(
  s"""
 |SELECT w_state, i_item_id,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') <
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_before,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') >=
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_after
 |FROM (
 |  SELECT *
 |  FROM web_sales ws
 |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number =
wr.wr_order_number
 |AND ws.ws_item_sk = wr.wr_item_sk)
 |) a1
 |JOIN item i ON a1.ws_item_sk = i.i_item_sk
 |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
 |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
 |AND unix_timestamp(d.d_date, '-MM-dd') >=
unix_timestamp('2001-03-16', '-MM-dd') - 30*24*60*60 --subtract 30 days
in seconds
 |AND unix_timestamp(d.d_date, '-MM-dd') <=
unix_timestamp('2001-03-16', '-MM-dd') + 30*24*60*60 --add 30 days in
seconds
 |GROUP BY w_state,i_item_id
 |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is
hives cluster scale counter part
 |ORDER BY w_state,i_item_id
 |LIMIT 100
   """.stripMargin).show
val t2 = System.currentTimeMillis()

//For case 2
//Thread.sleep(60 * 1000)

spark.stop()
  }
}



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org