While it works with sort-merge-join, it takes about 12h to finish (with
10000 shuffle partitions). My hunch is that the reason for that is this:

INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to disk
(62 times so far)

(and lots more where this comes from).

On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com> wrote:

> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>
> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
> default. However, the sort-merge join in 1.4 can still trigger a lot of
> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
> 1.5 for your case.
>
>
> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak <tom...@gmail.com> wrote:
>
>> I'm getting errors like "Removing executor with no recent heartbeats" &
>> "Missing an output location for shuffle" errors for a large SparkSql join
>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>> configure the job to avoid them.
>>
>> The initial stage completes fine with some 30k tasks on a cluster with 70
>> machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>> the shuffle stage first waits 30min in the scheduling phase according to
>> the UI, and then dies with the mentioned errors.
>>
>> I can see in the GC logs that the executors reach their memory limits
>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>
>> num     #instances         #bytes  class name
>> ----------------------------------------------
>>    1:     249139595    11958700560
>>  scala.collection.immutable.HashMap$HashMap1
>>    2:     251085327     8034730464  scala.Tuple2
>>    3:     243694737     5848673688  java.lang.Float
>>    4:     231198778     5548770672  java.lang.Integer
>>    5:      72191585     4298521576  [Lscala.collection.immutable.HashMap;
>>    6:      72191582     2310130624
>>  scala.collection.immutable.HashMap$HashTrieMap
>>    7:      74114058     1778737392  java.lang.Long
>>    8:       6059103      779203840  [Ljava.lang.Object;
>>    9:       5461096      174755072  scala.collection.mutable.ArrayBuffer
>>   10:         34749       70122104  [B
>>
>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>
>> spark.core.connection.ack.wait.timeout 600
>> spark.executor.heartbeatInterval       60s
>> spark.executor.memory                  32g
>> spark.mesos.coarse                     false
>> spark.network.timeout                  600s
>> spark.shuffle.blockTransferService     netty
>> spark.shuffle.consolidateFiles         true
>> spark.shuffle.file.buffer              1m
>> spark.shuffle.io.maxRetries            6
>> spark.shuffle.manager                  sort
>>
>> The join is currently configured with spark.sql.shuffle.partitions=1000
>> but that doesn't seem to help. Would increasing the partitions help ? Is
>> there a formula to determine an approximate partitions number value for a
>> join ?
>> Any help with this job would be appreciated !
>>
>> cheers,
>> Tom
>>
>
>

Reply via email to