Only SQL and DataFrame for now. We are thinking about how to apply that to a more general distributed collection based API, but it's not in 1.5.
On Sat, Sep 5, 2015 at 11:56 AM, Gurvinder Singh <gurvinder.si...@uninett.no > wrote: > On 09/05/2015 11:22 AM, Reynold Xin wrote: > > Try increase the shuffle memory fraction (by default it is only 16%). > > Again, if you run Spark 1.5, this will probably run a lot faster, > > especially if you increase the shuffle memory fraction ... > Hi Reynold, > > Does the 1.5 has better join/cogroup performance for RDD case too or > only for SQL. > > - Gurvinder > > > > On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak <tom...@gmail.com > > <mailto:tom...@gmail.com>> wrote: > > > > While it works with sort-merge-join, it takes about 12h to finish > > (with 10000 shuffle partitions). My hunch is that the reason for > > that is this: > > > > INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB > > to disk (62 times so far) > > > > (and lots more where this comes from). > > > > On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com > > <mailto:r...@databricks.com>> wrote: > > > > Can you try 1.5? This should work much, much better in 1.5 out > > of the box. > > > > For 1.4, I think you'd want to turn on sort-merge-join, which is > > off by default. However, the sort-merge join in 1.4 can still > > trigger a lot of garbage, making it slower. SMJ performance is > > probably 5x - 1000x better in 1.5 for your case. > > > > > > On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak > > <tom...@gmail.com <mailto:tom...@gmail.com>> wrote: > > > > I'm getting errors like "Removing executor with no recent > > heartbeats" & "Missing an output location for shuffle" > > errors for a large SparkSql join (1bn rows/2.5TB joined with > > 1bn rows/30GB) and I'm not sure how to configure the job to > > avoid them. > > > > The initial stage completes fine with some 30k tasks on a > > cluster with 70 machines/10TB memory, generating about 6.5TB > > of shuffle writes, but then the shuffle stage first waits > > 30min in the scheduling phase according to the UI, and then > > dies with the mentioned errors. > > > > I can see in the GC logs that the executors reach their > > memory limits (32g per executor, 2 workers per machine) and > > can't allocate any more stuff in the heap. Fwiw, the top 10 > > in the memory use histogram are: > > > > num #instances #bytes class name > > ---------------------------------------------- > > 1: 249139595 11958700560 > > scala.collection.immutable.HashMap$HashMap1 > > 2: 251085327 8034730464 <tel:8034730464> > > scala.Tuple2 > > 3: 243694737 5848673688 java.lang.Float > > 4: 231198778 5548770672 java.lang.Integer > > 5: 72191585 4298521576 > > [Lscala.collection.immutable.HashMap; > > 6: 72191582 2310130624 > > scala.collection.immutable.HashMap$HashTrieMap > > 7: 74114058 1778737392 java.lang.Long > > 8: 6059103 779203840 [Ljava.lang.Object; > > 9: 5461096 174755072 > > scala.collection.mutable.ArrayBuffer > > 10: 34749 70122104 [B > > > > Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): > > > > spark.core.connection.ack.wait.timeout 600 > > spark.executor.heartbeatInterval 60s > > spark.executor.memory 32g > > spark.mesos.coarse false > > spark.network.timeout 600s > > spark.shuffle.blockTransferService netty > > spark.shuffle.consolidateFiles true > > spark.shuffle.file.buffer 1m > > spark.shuffle.io.maxRetries 6 > > spark.shuffle.manager sort > > > > The join is currently configured with > > spark.sql.shuffle.partitions=1000 but that doesn't seem to > > help. Would increasing the partitions help ? Is there a > > formula to determine an approximate partitions number value > > for a join ? > > Any help with this job would be appreciated ! > > > > cheers, > > Tom > > > > > > > > > >