IPv6 regression in Spark 1.5.1

2015-10-14 Thread Thomas Dudziak
It looks like Spark 1.5.1 does not work with IPv6. When
adding -Djava.net.preferIPv6Addresses=true on my dual stack server, the
driver fails with:

15/10/14 14:36:01 ERROR SparkContext: Error initializing SparkContext.
java.lang.AssertionError: assertion failed: Expected hostname
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.util.Utils$.checkHost(Utils.scala:805)
at org.apache.spark.storage.BlockManagerId.(BlockManagerId.scala:48)
at org.apache.spark.storage.BlockManagerId$.apply(BlockManagerId.scala:107)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:190)
at org.apache.spark.SparkContext.(SparkContext.scala:528)

Looking at the checkHost method, it clearly does not work for IPv6 as it
assumes : is not a valid part of the hostname. I think this code should use
Guava's HostAndPort or related classes to properly deal with IPv4 and IPv6
(and other parts of Utils already use Guava).


Re: IPv6 regression in Spark 1.5.1

2015-10-14 Thread Thomas Dudziak
Specifically, something like this should probably do the trick:

  def checkHost(host: String, message: String = "") {
assert(!HostAndPort.fromString(host).hasPort, message)

  def checkHostPort(hostPort: String, message: String = "") {
assert(HostAndPort.fromString(hostPort).hasPort, message)

On Wed, Oct 14, 2015 at 2:40 PM, Thomas Dudziak <tom...@gmail.com> wrote:

Yahoo's Caffe-on-Spark project

2015-09-29 Thread Thomas Dudziak

I would be curious to learn what the Spark developer's plans are in this
area (NNs, GPUs) and what they think of integration with existing NN
frameworks like Caffe or Torch.


Accumulator with non-java-serializable value ?

2015-09-09 Thread Thomas Dudziak
I want to use t-digest with foreachPartition and accumulators (essentially,
create a t-digest per partition and add that to the accumulator leveraging
the fact that t-digests can be added to each other). I can make t-digests
kryo-serializable easily but java-serializable is not very easy.
Now, when running it (1.4.1), I get this error:

Exception in thread "main" org.apache.spark.SparkException: Task not
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:877)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:876)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:876)
at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1255)

which makes sense because the suggested way to use accumulators is via
closures. But since in my case I can't easily make the value type
java-serializable, that won't work. Is there another way to pass the
accumulator to the tasks that doesn't involve closures and hence java
serialization ?

Re: How to avoid shuffle errors for a large join ?

2015-09-01 Thread Thomas Dudziak
While it works with sort-merge-join, it takes about 12h to finish (with
1 shuffle partitions). My hunch is that the reason for that is this:

INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to disk
(62 times so far)

(and lots more where this comes from).

On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com> wrote:

> Can you try 1.5? This should work much, much better in 1.5 out of the box.
> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
> default. However, the sort-merge join in 1.4 can still trigger a lot of
> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
> 1.5 for your case.
Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Thomas Dudziak
I'm curious where the factor of 6-8 comes from ? Is this assuming snappy
(or lzf) compression ? The sizes I mentioned are what the Spark UI reports,
not sure if those are before or after compression (for the shuffle

Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Thomas Dudziak
Yeah, I tried with 10k and 30k and these still failed, will try with more
then. Though that is a little disappointing, it only writes ~7TB of shuffle
data which shouldn't in theory require more than 1000 reducers on my 10TB
memory cluster (~7GB of spill per reducer).
I'm now wondering if my shuffle partitions are uneven and I should use a
custom partitioner, is there a way to get stats on the partition sizes from
Spark ?

How to avoid shuffle errors for a large join ?

2015-08-27 Thread Thomas Dudziak
I'm getting errors like Removing executor with no recent heartbeats 
Missing an output location for shuffle errors for a large SparkSql join
(1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
configure the job to avoid them.

The initial stage completes fine with some 30k tasks on a cluster with 70
machines/10TB memory, generating about 6.5TB of shuffle writes, but then
the shuffle stage first waits 30min in the scheduling phase according to
the UI, and then dies with the mentioned errors.

I can see in the GC logs that the executors reach their memory limits (32g
per executor, 2 workers per machine) and can't allocate any more stuff in
the heap. Fwiw, the top 10 in the memory use histogram are:

num #instances #bytes  class name
   1: 24913959511958700560
   2: 251085327 8034730464  scala.Tuple2
   3: 243694737 5848673688  java.lang.Float
   4: 231198778 5548770672  java.lang.Integer
   5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
   6:  72191582 2310130624
   7:  74114058 1778737392  java.lang.Long
   8:   6059103  779203840  [Ljava.lang.Object;
   9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
  10: 34749   70122104  [B

Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

spark.core.connection.ack.wait.timeout 600
spark.executor.heartbeatInterval   60s
spark.executor.memory  32g
spark.mesos.coarse false
spark.network.timeout  600s
spark.shuffle.blockTransferService netty
spark.shuffle.consolidateFiles true
spark.shuffle.file.buffer  1m
spark.shuffle.manager  sort

The join is currently configured with spark.sql.shuffle.partitions=1000 but
that doesn't seem to help. Would increasing the partitions help ? Is there
a formula to determine an approximate partitions number value for a join ?
Any help with this job would be appreciated !


Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Using TABLESAMPLE(0.1) is actually way worse. Spark first spends 12 minutes
to discover all split files on all hosts (for some reason) before it even
starts the job, and then it creates 3.5 million tasks (the partition has
~32k split files).

On Wed, Aug 26, 2015 at 9:36 AM, Jörn Franke jornfra...@gmail.com wrote:

 Have you tried tablesample? You find the exact syntax in the
 documentation, but it exlxactly does what you want

Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from
and I don't particularly care which rows. Doing a LIMIT unfortunately
results in two stages where the first stage reads the whole table, and the
second then performs the limit with a single worker, which is not very
Is there a better way to sample a subset of rows in Spark without, ideally
in a single stage without reading all partitions.


Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Sorry, I meant without reading from all splits. This is a single partition
in the table.

Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I'm using fine-grained for a multi-tenant environment which is why I would
welcome the limit of tasks per job :)


On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com

 Hey Tom,

 Are you using the fine-grained or coarse-grained scheduler? For the
 coarse-grained scheduler, there is a spark.cores.max config setting that
 will limit the total # of cores it grabs. This was there in earlier
 versions too.


  On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote:
  I read the other day that there will be a fair number of improvements in
 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
 configurable limit for the number of tasks for jobs run on Mesos ? This
 would be a very simple yet effective way to prevent a job dominating the

Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I read the other day that there will be a fair number of improvements in
1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
configurable limit for the number of tasks for jobs run on Mesos ? This
would be a very simple yet effective way to prevent a job dominating the


Exception when using CLUSTER BY or ORDER BY

2015-05-19 Thread Thomas Dudziak
Under certain circumstances that I haven't yet been able to isolate, I get
the following error when doing a HQL query using HiveContext (Spark 1.3.1
on Mesos, fine-grained mode). Is this a known problem or should I file a
JIRA for it ?

org.apache.spark.SparkException: Can only zip RDDs with same number of
elements in each partition
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.lang.Thread.run(Thread.java:745)

Re: Spark's Guava pieces cause exceptions in non-trivial deployments

2015-05-15 Thread Thomas Dudziak
This is still a problem in 1.3. Optional is both used in several shaded
classes within Guava (e.g. the Immutable* classes) and itself uses shaded
classes (e.g. AbstractIterator). This causes problems in application code.
The only reliable way we've found around this is to shade Guava ourselves
for application code and thus avoid the problem altogether.


Re: Spark's Guava pieces cause exceptions in non-trivial deployments

2015-05-15 Thread Thomas Dudziak
Actually the extraClassPath settings put the extra jars at the end of the
classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them
at the front.


