IPv6 regression in Spark 1.5.1

2015-10-14 Thread Thomas Dudziak
It looks like Spark 1.5.1 does not work with IPv6. When
adding -Djava.net.preferIPv6Addresses=true on my dual stack server, the
driver fails with:

15/10/14 14:36:01 ERROR SparkContext: Error initializing SparkContext.
java.lang.AssertionError: assertion failed: Expected hostname
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.util.Utils$.checkHost(Utils.scala:805)
at org.apache.spark.storage.BlockManagerId.(BlockManagerId.scala:48)
at org.apache.spark.storage.BlockManagerId$.apply(BlockManagerId.scala:107)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:190)
at org.apache.spark.SparkContext.(SparkContext.scala:528)
at
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)

Looking at the checkHost method, it clearly does not work for IPv6 as it
assumes : is not a valid part of the hostname. I think this code should use
Guava's HostAndPort or related classes to properly deal with IPv4 and IPv6
(and other parts of Utils already use Guava).

cheers,
Tom


Re: IPv6 regression in Spark 1.5.1

2015-10-14 Thread Thomas Dudziak
Specifically, something like this should probably do the trick:

  def checkHost(host: String, message: String = "") {
assert(!HostAndPort.fromString(host).hasPort, message)
  }

  def checkHostPort(hostPort: String, message: String = "") {
assert(HostAndPort.fromString(hostPort).hasPort, message)
  }


On Wed, Oct 14, 2015 at 2:40 PM, Thomas Dudziak <tom...@gmail.com> wrote:

> It looks like Spark 1.5.1 does not work with IPv6. When
> adding -Djava.net.preferIPv6Addresses=true on my dual stack server, the
> driver fails with:
>
> 15/10/14 14:36:01 ERROR SparkContext: Error initializing SparkContext.
> java.lang.AssertionError: assertion failed: Expected hostname
> at scala.Predef$.assert(Predef.scala:179)
> at org.apache.spark.util.Utils$.checkHost(Utils.scala:805)
> at org.apache.spark.storage.BlockManagerId.(BlockManagerId.scala:48)
> at org.apache.spark.storage.BlockManagerId$.apply(BlockManagerId.scala:107)
> at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:190)
> at org.apache.spark.SparkContext.(SparkContext.scala:528)
> at
> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)
>
> Looking at the checkHost method, it clearly does not work for IPv6 as it
> assumes : is not a valid part of the hostname. I think this code should use
> Guava's HostAndPort or related classes to properly deal with IPv4 and IPv6
> (and other parts of Utils already use Guava).
>
> cheers,
> Tom
>


Yahoo's Caffe-on-Spark project

2015-09-29 Thread Thomas Dudziak
http://yahoohadoop.tumblr.com/post/129872361846/large-scale-distributed-deep-learning-on-hadoop

I would be curious to learn what the Spark developer's plans are in this
area (NNs, GPUs) and what they think of integration with existing NN
frameworks like Caffe or Torch.

cheers,
Tom


Accumulator with non-java-serializable value ?

2015-09-09 Thread Thomas Dudziak
I want to use t-digest with foreachPartition and accumulators (essentially,
create a t-digest per partition and add that to the accumulator leveraging
the fact that t-digests can be added to each other). I can make t-digests
kryo-serializable easily but java-serializable is not very easy.
Now, when running it (1.4.1), I get this error:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:877)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:876)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:876)
at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1255)

which makes sense because the suggested way to use accumulators is via
closures. But since in my case I can't easily make the value type
java-serializable, that won't work. Is there another way to pass the
accumulator to the tasks that doesn't involve closures and hence java
serialization ?


Re: How to avoid shuffle errors for a large join ?

2015-09-01 Thread Thomas Dudziak
While it works with sort-merge-join, it takes about 12h to finish (with
1 shuffle partitions). My hunch is that the reason for that is this:

INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to disk
(62 times so far)

(and lots more where this comes from).

On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com> wrote:

> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>
> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
> default. However, the sort-merge join in 1.4 can still trigger a lot of
> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
> 1.5 for your case.
>
>
> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak <tom...@gmail.com> wrote:
>
>> I'm getting errors like "Removing executor with no recent heartbeats" &
>> "Missing an output location for shuffle" errors for a large SparkSql join
>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>> configure the job to avoid them.
>>
>> The initial stage completes fine with some 30k tasks on a cluster with 70
>> machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>> the shuffle stage first waits 30min in the scheduling phase according to
>> the UI, and then dies with the mentioned errors.
>>
>> I can see in the GC logs that the executors reach their memory limits
>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>
>> num #instances #bytes  class name
>> --
>>1: 24913959511958700560
>>  scala.collection.immutable.HashMap$HashMap1
>>2: 251085327 8034730464  scala.Tuple2
>>3: 243694737 5848673688  java.lang.Float
>>4: 231198778 5548770672  java.lang.Integer
>>5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
>>6:  72191582 2310130624
>>  scala.collection.immutable.HashMap$HashTrieMap
>>7:  74114058 1778737392  java.lang.Long
>>8:   6059103  779203840  [Ljava.lang.Object;
>>9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
>>   10: 34749   70122104  [B
>>
>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>
>> spark.core.connection.ack.wait.timeout 600
>> spark.executor.heartbeatInterval   60s
>> spark.executor.memory  32g
>> spark.mesos.coarse false
>> spark.network.timeout  600s
>> spark.shuffle.blockTransferService netty
>> spark.shuffle.consolidateFiles true
>> spark.shuffle.file.buffer  1m
>> spark.shuffle.io.maxRetries6
>> spark.shuffle.manager  sort
>>
>> The join is currently configured with spark.sql.shuffle.partitions=1000
>> but that doesn't seem to help. Would increasing the partitions help ? Is
>> there a formula to determine an approximate partitions number value for a
>> join ?
>> Any help with this job would be appreciated !
>>
>> cheers,
>> Tom
>>
>
>


Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Thomas Dudziak
I'm curious where the factor of 6-8 comes from ? Is this assuming snappy
(or lzf) compression ? The sizes I mentioned are what the Spark UI reports,
not sure if those are before or after compression (for the shuffle
read/write).

On Fri, Aug 28, 2015 at 4:41 PM, java8964 java8...@hotmail.com wrote:

 There are several possibilities here.

 1) Keep in mind that 7GB data will need way more than 7G heap, as
 deserialize java object needs much more space than data itself. Grand rule
 is multiple 6 to 8 times, so 7G data need 50G heap space.
 2) You should monitor the Spark UI, to check how many records being
 processed by task, and if the failed tasks have more data than the rest.
 Even current you have tasks failed, you will also have the tasks succeeded.
 Compare them, does the failed tasks process way more records than the
 succeeded ones? If so, it indicates you have data skew problem.
 3) If the failed tasks allocated similar records as succeeded ones, then
 you just add more partitions, to make each task processing less data, You
 should always monitor the GC output in these cases.
 4) If most of your tasks failed due to memory, then your setting is too
 small for your data, adding partitions or memory.


 Yong

 --
 From: tom...@gmail.com
 Date: Fri, 28 Aug 2015 13:55:52 -0700
 Subject: Re: How to avoid shuffle errors for a large join ?
 To: ja...@jasonknight.us
 CC: user@spark.apache.org


 Yeah, I tried with 10k and 30k and these still failed, will try with more
 then. Though that is a little disappointing, it only writes ~7TB of shuffle
 data which shouldn't in theory require more than 1000 reducers on my 10TB
 memory cluster (~7GB of spill per reducer).
 I'm now wondering if my shuffle partitions are uneven and I should use a
 custom partitioner, is there a way to get stats on the partition sizes from
 Spark ?

 On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote:

 I had similar problems to this (reduce side failures for large joins (25bn
 rows with 9bn)), and found the answer was to further up the
 spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
 me, but your tables look a little denser, so you may want to go even higher.

 On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits (32g
 per executor, 2 workers per machine) and can't allocate any more stuff in
 the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom





Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Thomas Dudziak
Yeah, I tried with 10k and 30k and these still failed, will try with more
then. Though that is a little disappointing, it only writes ~7TB of shuffle
data which shouldn't in theory require more than 1000 reducers on my 10TB
memory cluster (~7GB of spill per reducer).
I'm now wondering if my shuffle partitions are uneven and I should use a
custom partitioner, is there a way to get stats on the partition sizes from
Spark ?

On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote:

 I had similar problems to this (reduce side failures for large joins (25bn
 rows with 9bn)), and found the answer was to further up the
 spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
 me, but your tables look a little denser, so you may want to go even higher.

 On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits
 (32g per executor, 2 workers per machine) and can't allocate any more stuff
 in the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom




How to avoid shuffle errors for a large join ?

2015-08-27 Thread Thomas Dudziak
I'm getting errors like Removing executor with no recent heartbeats 
Missing an output location for shuffle errors for a large SparkSql join
(1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
configure the job to avoid them.

The initial stage completes fine with some 30k tasks on a cluster with 70
machines/10TB memory, generating about 6.5TB of shuffle writes, but then
the shuffle stage first waits 30min in the scheduling phase according to
the UI, and then dies with the mentioned errors.

I can see in the GC logs that the executors reach their memory limits (32g
per executor, 2 workers per machine) and can't allocate any more stuff in
the heap. Fwiw, the top 10 in the memory use histogram are:

num #instances #bytes  class name
--
   1: 24913959511958700560
 scala.collection.immutable.HashMap$HashMap1
   2: 251085327 8034730464  scala.Tuple2
   3: 243694737 5848673688  java.lang.Float
   4: 231198778 5548770672  java.lang.Integer
   5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
   6:  72191582 2310130624
 scala.collection.immutable.HashMap$HashTrieMap
   7:  74114058 1778737392  java.lang.Long
   8:   6059103  779203840  [Ljava.lang.Object;
   9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
  10: 34749   70122104  [B

Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

spark.core.connection.ack.wait.timeout 600
spark.executor.heartbeatInterval   60s
spark.executor.memory  32g
spark.mesos.coarse false
spark.network.timeout  600s
spark.shuffle.blockTransferService netty
spark.shuffle.consolidateFiles true
spark.shuffle.file.buffer  1m
spark.shuffle.io.maxRetries6
spark.shuffle.manager  sort

The join is currently configured with spark.sql.shuffle.partitions=1000 but
that doesn't seem to help. Would increasing the partitions help ? Is there
a formula to determine an approximate partitions number value for a join ?
Any help with this job would be appreciated !

cheers,
Tom


Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Using TABLESAMPLE(0.1) is actually way worse. Spark first spends 12 minutes
to discover all split files on all hosts (for some reason) before it even
starts the job, and then it creates 3.5 million tasks (the partition has
~32k split files).

On Wed, Aug 26, 2015 at 9:36 AM, Jörn Franke jornfra...@gmail.com wrote:


 Have you tried tablesample? You find the exact syntax in the
 documentation, but it exlxactly does what you want

 Le mer. 26 août 2015 à 18:12, Thomas Dudziak tom...@gmail.com a écrit :

 Sorry, I meant without reading from all splits. This is a single
 partition in the table.

 On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote:

 I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows
 from and I don't particularly care which rows. Doing a LIMIT unfortunately
 results in two stages where the first stage reads the whole table, and the
 second then performs the limit with a single worker, which is not very
 efficient.
 Is there a better way to sample a subset of rows in Spark without,
 ideally in a single stage without reading all partitions.

 cheers,
 Tom





Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from
and I don't particularly care which rows. Doing a LIMIT unfortunately
results in two stages where the first stage reads the whole table, and the
second then performs the limit with a single worker, which is not very
efficient.
Is there a better way to sample a subset of rows in Spark without, ideally
in a single stage without reading all partitions.

cheers,
Tom


Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Sorry, I meant without reading from all splits. This is a single partition
in the table.

On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote:

 I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from
 and I don't particularly care which rows. Doing a LIMIT unfortunately
 results in two stages where the first stage reads the whole table, and the
 second then performs the limit with a single worker, which is not very
 efficient.
 Is there a better way to sample a subset of rows in Spark without, ideally
 in a single stage without reading all partitions.

 cheers,
 Tom



Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I'm using fine-grained for a multi-tenant environment which is why I would
welcome the limit of tasks per job :)

cheers,
Tom

On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Hey Tom,

 Are you using the fine-grained or coarse-grained scheduler? For the
 coarse-grained scheduler, there is a spark.cores.max config setting that
 will limit the total # of cores it grabs. This was there in earlier
 versions too.

 Matei

  On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote:
 
  I read the other day that there will be a fair number of improvements in
 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
 configurable limit for the number of tasks for jobs run on Mesos ? This
 would be a very simple yet effective way to prevent a job dominating the
 cluster.
 
  cheers,
  Tom
 




Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I read the other day that there will be a fair number of improvements in
1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
configurable limit for the number of tasks for jobs run on Mesos ? This
would be a very simple yet effective way to prevent a job dominating the
cluster.

cheers,
Tom


Exception when using CLUSTER BY or ORDER BY

2015-05-19 Thread Thomas Dudziak
Under certain circumstances that I haven't yet been able to isolate, I get
the following error when doing a HQL query using HiveContext (Spark 1.3.1
on Mesos, fine-grained mode). Is this a known problem or should I file a
JIRA for it ?


org.apache.spark.SparkException: Can only zip RDDs with same number of
elements in each partition
at 
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56)
at 
org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259)
at 
org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Spark's Guava pieces cause exceptions in non-trivial deployments

2015-05-15 Thread Thomas Dudziak
This is still a problem in 1.3. Optional is both used in several shaded
classes within Guava (e.g. the Immutable* classes) and itself uses shaded
classes (e.g. AbstractIterator). This causes problems in application code.
The only reliable way we've found around this is to shade Guava ourselves
for application code and thus avoid the problem altogether.

cheers,
Tom

On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com
wrote:

  The problem is with 1.3.1

 It has Function class (mentioned in exception) in
 spark-network-common_2.10-1.3.1.jar.

 Our current resolution is actually backport to 1.2.2, which is working
 fine.





 *From:* Marcelo Vanzin [mailto:van...@cloudera.com]
 *Sent:* Thursday, May 14, 2015 6:27 PM
 *To:* Anton Brazhnyk
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial
 deployments



 What version of Spark are you using?

 The bug you mention is only about the Optional class (and a handful of
 others, but none of the classes you're having problems with). All other
 Guava classes should be shaded since Spark 1.2, so you should be able to
 use your own version of Guava with no problems (aside from the Optional
 classes).

 Also, Spark 1.3 added some improvements to how shading is done, so if
 you're using 1.2 I'd recommend trying 1.3 before declaring defeat.



 On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk 
 anton.brazh...@genesys.com wrote:

  Greetings,



 I have a relatively complex application with Spark, Jetty and Guava (16)
 not fitting together.

 Exception happens when some components try to use “mix” of Guava classes
 (including Spark’s pieces) that are loaded by different classloaders:

 java.lang.LinkageError: loader constraint violation: when resolving method
 com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable;
 the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader)
 of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the
 class loader (instance of java/net/URLClassLoader) for resolved class,
 com/google/common/collect/Iterables, have different Class objects for the
 type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the
 signature



 According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not
 going to be fixed at least until Spark 2.0, but maybe some workaround is
 possible?

 Those classes are pretty simple and have low chances to be changed in
 Guava significantly, so any “external” Guava can provide them.



 So, could such problems be fixed if those Spark’s pieces of Guava would be
 in separate jar and could be excluded from the mix (substituted by
 “external” Guava)?





 Thanks,

 Anton




 --

 Marcelo



Re: Spark's Guava pieces cause exceptions in non-trivial deployments

2015-05-15 Thread Thomas Dudziak
Actually the extraClassPath settings put the extra jars at the end of the
classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them
at the front.

cheers,
Tom

On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com
wrote:

 Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it
 depends on), but removing that would break the public API, so...

 One last thing you could try is do add your newer Guava jar to
 spark.driver.extraClassPath and spark.executor.extraClassPath. Those
 settings will place your jars before Spark's in the classpath, so you'd
 actually be using the newer versions of the conflicting classes everywhere.

 It does require manually distributing the Guava jar to the same location
 on all nodes in the cluster, though.

 If that doesn't work. Thomas's suggestion of shading Guava in your app can
 be used as a last resort.


 On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk 
 anton.brazh...@genesys.com wrote:

  The problem is with 1.3.1

 It has Function class (mentioned in exception) in
 spark-network-common_2.10-1.3.1.jar.

 Our current resolution is actually backport to 1.2.2, which is working
 fine.





 *From:* Marcelo Vanzin [mailto:van...@cloudera.com]
 *Sent:* Thursday, May 14, 2015 6:27 PM
 *To:* Anton Brazhnyk
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial
 deployments



 What version of Spark are you using?

 The bug you mention is only about the Optional class (and a handful of
 others, but none of the classes you're having problems with). All other
 Guava classes should be shaded since Spark 1.2, so you should be able to
 use your own version of Guava with no problems (aside from the Optional
 classes).

 Also, Spark 1.3 added some improvements to how shading is done, so if
 you're using 1.2 I'd recommend trying 1.3 before declaring defeat.



 On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk 
 anton.brazh...@genesys.com wrote:

  Greetings,



 I have a relatively complex application with Spark, Jetty and Guava (16)
 not fitting together.

 Exception happens when some components try to use “mix” of Guava classes
 (including Spark’s pieces) that are loaded by different classloaders:

 java.lang.LinkageError: loader constraint violation: when resolving
 method
 com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable;
 the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader)
 of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the
 class loader (instance of java/net/URLClassLoader) for resolved class,
 com/google/common/collect/Iterables, have different Class objects for the
 type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the
 signature



 According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not
 going to be fixed at least until Spark 2.0, but maybe some workaround is
 possible?

 Those classes are pretty simple and have low chances to be changed in
 Guava significantly, so any “external” Guava can provide them.



 So, could such problems be fixed if those Spark’s pieces of Guava would
 be in separate jar and could be excluded from the mix (substituted by
 “external” Guava)?





 Thanks,

 Anton




 --

 Marcelo




 --
 Marcelo



Re: Spark's Guava pieces cause exceptions in non-trivial deployments

2015-05-15 Thread Thomas Dudziak
I've just been through this exact case with shaded guava in our Mesos setup
and that is how it behaves there (with Spark 1.3.1).

cheers,
Tom

On Fri, May 15, 2015 at 12:04 PM, Marcelo Vanzin van...@cloudera.com
wrote:

 On Fri, May 15, 2015 at 11:56 AM, Thomas Dudziak tom...@gmail.com wrote:

 Actually the extraClassPath settings put the extra jars at the end of the
 classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them
 at the front.


 That's definitely not the case for YARN:

 https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1013

 And it's been like that for as far as I remember.

 I'm almost sure that's also the case for standalone, at least in master /
 1.4, since I touched a lot of that code recently.

 It would be really weird if those options worked differently from
 SPARK_CLASSPATH, since they were meant to replace it.


 On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Ah, I see. yeah, it sucks that Spark has to expose Optional (and things
 it depends on), but removing that would break the public API, so...

 One last thing you could try is do add your newer Guava jar to
 spark.driver.extraClassPath and spark.executor.extraClassPath. Those
 settings will place your jars before Spark's in the classpath, so you'd
 actually be using the newer versions of the conflicting classes everywhere.

 It does require manually distributing the Guava jar to the same location
 on all nodes in the cluster, though.

 If that doesn't work. Thomas's suggestion of shading Guava in your app
 can be used as a last resort.


 On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk 
 anton.brazh...@genesys.com wrote:

  The problem is with 1.3.1

 It has Function class (mentioned in exception) in
 spark-network-common_2.10-1.3.1.jar.

 Our current resolution is actually backport to 1.2.2, which is working
 fine.





 *From:* Marcelo Vanzin [mailto:van...@cloudera.com]
 *Sent:* Thursday, May 14, 2015 6:27 PM
 *To:* Anton Brazhnyk
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial
 deployments



 What version of Spark are you using?

 The bug you mention is only about the Optional class (and a handful of
 others, but none of the classes you're having problems with). All other
 Guava classes should be shaded since Spark 1.2, so you should be able to
 use your own version of Guava with no problems (aside from the Optional
 classes).

 Also, Spark 1.3 added some improvements to how shading is done, so if
 you're using 1.2 I'd recommend trying 1.3 before declaring defeat.



 On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk 
 anton.brazh...@genesys.com wrote:

  Greetings,



 I have a relatively complex application with Spark, Jetty and Guava
 (16) not fitting together.

 Exception happens when some components try to use “mix” of Guava
 classes (including Spark’s pieces) that are loaded by different
 classloaders:

 java.lang.LinkageError: loader constraint violation: when resolving
 method
 com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable;
 the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader)
 of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the
 class loader (instance of java/net/URLClassLoader) for resolved class,
 com/google/common/collect/Iterables, have different Class objects for the
 type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the
 signature



 According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not
 going to be fixed at least until Spark 2.0, but maybe some workaround is
 possible?

 Those classes are pretty simple and have low chances to be changed in
 Guava significantly, so any “external” Guava can provide them.



 So, could such problems be fixed if those Spark’s pieces of Guava would
 be in separate jar and could be excluded from the mix (substituted by
 “external” Guava)?





 Thanks,

 Anton




 --

 Marcelo




 --
 Marcelo





 --
 Marcelo