Shuffle Write Size

2015-12-24 Thread gsvic
Is there any formula with which I could determine Shuffle Write before
execution?

For example, in Sort Merge join in the stage in which the first table is
being loaded the shuffle write is 429.2 MB. The table is 5.5G in the HDFS
with block size 128 MB. Consequently is being loaded in 45 tasks/partitions.
How this 5.5 GB results in 429 MB? Could I determine it before execution? 

Environment:
#Workers = 2
#Cores/Worker = 4
#Assigned Memory / Worker = 512M

spark.shuffle.partitions=200
spark.shuffle.compress=false
spark.shuffle.memoryFraction=0.1
spark.shuffle.spill=true



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-Write-Size-tp15779.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Hash Partitioning & Sort Merge Join

2015-11-18 Thread gsvic
In case of Sort Merge join in which a shuffle (exchange) will be performed, I
have the following questions (Please correct me if my understanding is not
correct): 

Let's say that relation A is a JSONRelation (640 MB) on the HDFS where the
block size is 64MB. This will produce a Scan JSONRelation() of 10 partitions
( 640 / 64 ) where each of these partitions will contain |A| / 10 rows. 

The second step will be a hashPartitioning(#key, 200) where #key is the
equi-join condition and 200 the default number of shuffles
(spark.sql.shuffle.partitions). Each partition will be computed in an
individual task, in which every row will be hashed on the #key and then will
be written in the corresponing chunk (of 200 resulting chunks) directly on
disk. 

Q1: What happens if a resulting hashed row in the executor A must be written
in a chunk which is stored in the executor B? Does it use the
HashShuffleManager to transfer it over the network? 

Q2: After the Sort (3rd) step there will be 200, 200 resulting
partitions/chunks for relations A and B respectively which will be
concatenated into 200 SortMergeJoin tasks where each of them will contain
(|A|/200 + |B|/200) rows. For each pair (chunkOfA, chunkOfB) will chunkOfA
and chunkOfB contain rows for the same hash key ? 

Q3: In the SortMergeJoin of Q2, I suppose that each of the 200 SortMergeJoin
tasks joins two partitions/chunks with the same hash key. So, if a task
corresponds to a hash key X, does it use ShuffleBlockFetchIterator to fetch
the two Shuffles/Chunks (of relations A and B) with hash key X?

Q4: Which sorting algorithm is being used?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Hash-Partitioning-Sort-Merge-Join-tp15275.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Map Tasks - Disk Spill (?)

2015-11-15 Thread gsvic
According to  this paper

  
Spak's map tasks writes the results to disk. 

My actual question is, in  BroadcastHashJoin

  
doExecute() method at line  109 the mapPartitions

  
method is called. At this step, Spark will schedule a number of tasks for
execution in order to perform the hash join operation. The results of these
tasks will be written to each worker's disk?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Map-Tasks-Disk-Spill-tp15217.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Are map tasks spilling data to disk?

2015-11-15 Thread gsvic
According to  this paper

  
Spak's map tasks writes the results to disk. 

My actual question is, in  BroadcastHashJoin

  
doExecute() method at line  109 the mapPartitions

  
method is called. At this step, Spark will schedule a number of tasks for
execution in order to perform the hash join operation. The results of these
tasks will be written to each worker's disk?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Are-map-tasks-spilling-data-to-disk-tp15216.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Map Tasks - Disk I/O

2015-11-11 Thread gsvic
According to  this paper

  
Spak's map tasks writes the results to disk. 

My actual question is, in  BroadcastHashJoin

  
doExecute() method at line  109 the mapPartitions

  
method is called. At this step, Spark will schedule a number of tasks for
execution in order to perform the hash join operation. The results of these
tasks will be written to each worker's disk?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Map-Tasks-Disk-I-O-tp15154.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Task Execution

2015-09-30 Thread gsvic
Concerning task execution, a worker executes its assigned tasks in parallel
or sequentially?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Task-Execution-tp14411.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: RDD: Execution and Scheduling

2015-09-20 Thread gsvic
Concerning answers 1 and 2: 

1) How Spark determines a node as a "slow node" and how slow is that? 

2) How an RDD chooses a location as a preferred location and with which
criteria? 

Could you please also include the links of the source files for the two
questions above?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Execution-and-Scheduling-tp14177p14226.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



RDD: Execution and Scheduling

2015-09-17 Thread gsvic
After reading some parts of Spark source code I would like to make some
questions about RDD execution and scheduling.

At first, please correct me if I am wrong at the following:
1) The number of partitions equals to the number of tasks will be executed
in parallel (e.g. , when an RDD is repartitioned in 30 partitions, a count
aggregate will be executed in 30 tasks distributed in the cluster)

2) A  task

  
concerns only one partition (partitionId: Int) and this partition maps to an
RDD block.

3) If and RDD is cached, then the preferred location for execution of this
Partition and the corresponding RDD block will be the node the data is
cached in.

The questions are the following:

I run some SQL aggregate functions on a TPCH dataset. The cluster is
consisted of 7 executors (and one driver) each one contains 8 GB RAM and 4
VCPUs. The dataset is in Parquet file format in an external Hadoop Cluster,
that is, Spark workers and Hadoop DataNodes are running on different VMs.

1) For a count aggregate, I repartitioned the DataFrame into 24 partitions
and each executor took 2 partitions(tasks) for execution. Is that always
happens the same way (the number of tasks per node is equal to
#Partitions/#Workers) ?

2) How Spark chooses the executor for each task if the data is not cached?
It's clear what happens if the data is cached in  DAGScheduler.scala

 
, but what if is not? Is it possible to determine that before execution?

3) In the case of an SQL Join operation, is it possible to determine how
many tasks/partitions will be generated and in which worker each task be
submitted?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Execution-and-Scheduling-tp14177.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: SQLContext.read.json(path) throws java.io.IOException

2015-08-26 Thread gsvic
Yes, it contain one line

On Wed, Aug 26, 2015 at 8:20 PM, Yin Huai-2 [via Apache Spark Developers
List] ml-node+s1001551n13852...@n3.nabble.com wrote:

 The JSON support in Spark SQL handles a file with one JSON object per line
 or one JSON array of objects per line. What is the format your file? Does
 it only contain a single line?

 On Wed, Aug 26, 2015 at 6:47 AM, gsvic [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=13852i=0 wrote:

 Hi,

 I have the following issue. I am trying to load a 2.5G JSON file from a
 10-node Hadoop Cluster. Actually, I am trying to create a DataFrame, using
 sqlContext.read.json(hdfs://master:9000/path/file.json).

 The JSON file contains a parsed table(relation) from the TPCH benchmark.

 After finishing some tasks, the job fails by throwing several
 java.io.IOExceptions. For smaller files (eg 700M it works fine). I am
 posting a part of the log and the whole stack trace below:

 15/08/26 16:31:44 INFO TaskSetManager: Starting task 10.1 in stage 1.0
 (TID
 47, 192.168.5.146, ANY, 1416 bytes)
 15/08/26 16:31:44 INFO TaskSetManager: Starting task 11.1 in stage 1.0
 (TID
 48, 192.168.5.150, ANY, 1416 bytes)
 15/08/26 16:31:44 INFO TaskSetManager: Starting task 4.1 in stage 1.0 (TID
 49, 192.168.5.149, ANY, 1416 bytes)
 15/08/26 16:31:44 INFO TaskSetManager: Starting task 8.1 in stage 1.0 (TID
 50, 192.168.5.246, ANY, 1416 bytes)
 15/08/26 16:31:53 INFO TaskSetManager: Finished task 10.0 in stage 1.0
 (TID
 17) in 104681 ms on 192.168.5.243 (27/35)
 15/08/26 16:31:53 INFO TaskSetManager: Finished task 8.0 in stage 1.0 (TID
 15) in 105541 ms on 192.168.5.193 (28/35)
 15/08/26 16:31:55 INFO TaskSetManager: Finished task 11.0 in stage 1.0
 (TID
 18) in 107122 ms on 192.168.5.167 (29/35)
 15/08/26 16:31:57 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID
 12) in 109583 ms on 192.168.5.245 (30/35)
 15/08/26 16:32:08 INFO TaskSetManager: Finished task 4.1 in stage 1.0 (TID
 49) in 24135 ms on 192.168.5.149 (31/35)
 15/08/26 16:32:13 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 9,
 192.168.5.246): java.io.IOException: Too many bytes before newline:
 2147483648
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:249)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
 at

 org.apache.hadoop.mapred.LineRecordReader.init(LineRecordReader.java:134)
 at

 org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:239)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/SQLContext-read-json-path-throws-java-io-IOException-tp13841.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=13852i=1
 For additional commands, e-mail: [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=13852i=2




 --
 If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-developers-list.1001551.n3.nabble.com/SQLContext

SQLContext.read.json(path) throws java.io.IOException

2015-08-26 Thread gsvic
Hi,

I have the following issue. I am trying to load a 2.5G JSON file from a
10-node Hadoop Cluster. Actually, I am trying to create a DataFrame, using
sqlContext.read.json(hdfs://master:9000/path/file.json). 

The JSON file contains a parsed table(relation) from the TPCH benchmark. 

After finishing some tasks, the job fails by throwing several
java.io.IOExceptions. For smaller files (eg 700M it works fine). I am
posting a part of the log and the whole stack trace below: 

15/08/26 16:31:44 INFO TaskSetManager: Starting task 10.1 in stage 1.0 (TID
47, 192.168.5.146, ANY, 1416 bytes)
15/08/26 16:31:44 INFO TaskSetManager: Starting task 11.1 in stage 1.0 (TID
48, 192.168.5.150, ANY, 1416 bytes)
15/08/26 16:31:44 INFO TaskSetManager: Starting task 4.1 in stage 1.0 (TID
49, 192.168.5.149, ANY, 1416 bytes)
15/08/26 16:31:44 INFO TaskSetManager: Starting task 8.1 in stage 1.0 (TID
50, 192.168.5.246, ANY, 1416 bytes)
15/08/26 16:31:53 INFO TaskSetManager: Finished task 10.0 in stage 1.0 (TID
17) in 104681 ms on 192.168.5.243 (27/35)
15/08/26 16:31:53 INFO TaskSetManager: Finished task 8.0 in stage 1.0 (TID
15) in 105541 ms on 192.168.5.193 (28/35)
15/08/26 16:31:55 INFO TaskSetManager: Finished task 11.0 in stage 1.0 (TID
18) in 107122 ms on 192.168.5.167 (29/35)
15/08/26 16:31:57 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID
12) in 109583 ms on 192.168.5.245 (30/35)
15/08/26 16:32:08 INFO TaskSetManager: Finished task 4.1 in stage 1.0 (TID
49) in 24135 ms on 192.168.5.149 (31/35)
15/08/26 16:32:13 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 9,
192.168.5.246): java.io.IOException: Too many bytes before newline:
2147483648
at 
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:249)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at
org.apache.hadoop.mapred.LineRecordReader.init(LineRecordReader.java:134)
at
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:239)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SQLContext-read-json-path-throws-java-io-IOException-tp13841.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: SQLContext.read.json(path) throws java.io.IOException

2015-08-26 Thread gsvic
No, I created the file by appending each JSON record in a loop without
changing line. I've just changed that and now it works fine. Thank you very
much for your support.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SQLContext-read-json-path-throws-java-io-IOException-tp13841p13856.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org