IPv6 regression in Spark 1.5.1
It looks like Spark 1.5.1 does not work with IPv6. When adding -Djava.net.preferIPv6Addresses=true on my dual stack server, the driver fails with: 15/10/14 14:36:01 ERROR SparkContext: Error initializing SparkContext. java.lang.AssertionError: assertion failed: Expected hostname at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.util.Utils$.checkHost(Utils.scala:805) at org.apache.spark.storage.BlockManagerId.(BlockManagerId.scala:48) at org.apache.spark.storage.BlockManagerId$.apply(BlockManagerId.scala:107) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:190) at org.apache.spark.SparkContext.(SparkContext.scala:528) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017) Looking at the checkHost method, it clearly does not work for IPv6 as it assumes : is not a valid part of the hostname. I think this code should use Guava's HostAndPort or related classes to properly deal with IPv4 and IPv6 (and other parts of Utils already use Guava). cheers, Tom
Re: IPv6 regression in Spark 1.5.1
Specifically, something like this should probably do the trick: def checkHost(host: String, message: String = "") { assert(!HostAndPort.fromString(host).hasPort, message) } def checkHostPort(hostPort: String, message: String = "") { assert(HostAndPort.fromString(hostPort).hasPort, message) } On Wed, Oct 14, 2015 at 2:40 PM, Thomas Dudziak <tom...@gmail.com> wrote: > It looks like Spark 1.5.1 does not work with IPv6. When > adding -Djava.net.preferIPv6Addresses=true on my dual stack server, the > driver fails with: > > 15/10/14 14:36:01 ERROR SparkContext: Error initializing SparkContext. > java.lang.AssertionError: assertion failed: Expected hostname > at scala.Predef$.assert(Predef.scala:179) > at org.apache.spark.util.Utils$.checkHost(Utils.scala:805) > at org.apache.spark.storage.BlockManagerId.(BlockManagerId.scala:48) > at org.apache.spark.storage.BlockManagerId$.apply(BlockManagerId.scala:107) > at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:190) > at org.apache.spark.SparkContext.(SparkContext.scala:528) > at > org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017) > > Looking at the checkHost method, it clearly does not work for IPv6 as it > assumes : is not a valid part of the hostname. I think this code should use > Guava's HostAndPort or related classes to properly deal with IPv4 and IPv6 > (and other parts of Utils already use Guava). > > cheers, > Tom >
Yahoo's Caffe-on-Spark project
http://yahoohadoop.tumblr.com/post/129872361846/large-scale-distributed-deep-learning-on-hadoop I would be curious to learn what the Spark developer's plans are in this area (NNs, GPUs) and what they think of integration with existing NN frameworks like Caffe or Torch. cheers, Tom
Accumulator with non-java-serializable value ?
I want to use t-digest with foreachPartition and accumulators (essentially, create a t-digest per partition and add that to the accumulator leveraging the fact that t-digests can be added to each other). I can make t-digests kryo-serializable easily but java-serializable is not very easy. Now, when running it (1.4.1), I get this error: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:877) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:876) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:876) at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1255) which makes sense because the suggested way to use accumulators is via closures. But since in my case I can't easily make the value type java-serializable, that won't work. Is there another way to pass the accumulator to the tasks that doesn't involve closures and hence java serialization ?
Re: How to avoid shuffle errors for a large join ?
While it works with sort-merge-join, it takes about 12h to finish (with 1 shuffle partitions). My hunch is that the reason for that is this: INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to disk (62 times so far) (and lots more where this comes from). On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com> wrote: > Can you try 1.5? This should work much, much better in 1.5 out of the box. > > For 1.4, I think you'd want to turn on sort-merge-join, which is off by > default. However, the sort-merge join in 1.4 can still trigger a lot of > garbage, making it slower. SMJ performance is probably 5x - 1000x better in > 1.5 for your case. > > > On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak <tom...@gmail.com> wrote: > >> I'm getting errors like "Removing executor with no recent heartbeats" & >> "Missing an output location for shuffle" errors for a large SparkSql join >> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to >> configure the job to avoid them. >> >> The initial stage completes fine with some 30k tasks on a cluster with 70 >> machines/10TB memory, generating about 6.5TB of shuffle writes, but then >> the shuffle stage first waits 30min in the scheduling phase according to >> the UI, and then dies with the mentioned errors. >> >> I can see in the GC logs that the executors reach their memory limits >> (32g per executor, 2 workers per machine) and can't allocate any more stuff >> in the heap. Fwiw, the top 10 in the memory use histogram are: >> >> num #instances #bytes class name >> -- >>1: 24913959511958700560 >> scala.collection.immutable.HashMap$HashMap1 >>2: 251085327 8034730464 scala.Tuple2 >>3: 243694737 5848673688 java.lang.Float >>4: 231198778 5548770672 java.lang.Integer >>5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; >>6: 72191582 2310130624 >> scala.collection.immutable.HashMap$HashTrieMap >>7: 74114058 1778737392 java.lang.Long >>8: 6059103 779203840 [Ljava.lang.Object; >>9: 5461096 174755072 scala.collection.mutable.ArrayBuffer >> 10: 34749 70122104 [B >> >> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): >> >> spark.core.connection.ack.wait.timeout 600 >> spark.executor.heartbeatInterval 60s >> spark.executor.memory 32g >> spark.mesos.coarse false >> spark.network.timeout 600s >> spark.shuffle.blockTransferService netty >> spark.shuffle.consolidateFiles true >> spark.shuffle.file.buffer 1m >> spark.shuffle.io.maxRetries6 >> spark.shuffle.manager sort >> >> The join is currently configured with spark.sql.shuffle.partitions=1000 >> but that doesn't seem to help. Would increasing the partitions help ? Is >> there a formula to determine an approximate partitions number value for a >> join ? >> Any help with this job would be appreciated ! >> >> cheers, >> Tom >> > >
Re: How to avoid shuffle errors for a large join ?
I'm curious where the factor of 6-8 comes from ? Is this assuming snappy (or lzf) compression ? The sizes I mentioned are what the Spark UI reports, not sure if those are before or after compression (for the shuffle read/write). On Fri, Aug 28, 2015 at 4:41 PM, java8964 java8...@hotmail.com wrote: There are several possibilities here. 1) Keep in mind that 7GB data will need way more than 7G heap, as deserialize java object needs much more space than data itself. Grand rule is multiple 6 to 8 times, so 7G data need 50G heap space. 2) You should monitor the Spark UI, to check how many records being processed by task, and if the failed tasks have more data than the rest. Even current you have tasks failed, you will also have the tasks succeeded. Compare them, does the failed tasks process way more records than the succeeded ones? If so, it indicates you have data skew problem. 3) If the failed tasks allocated similar records as succeeded ones, then you just add more partitions, to make each task processing less data, You should always monitor the GC output in these cases. 4) If most of your tasks failed due to memory, then your setting is too small for your data, adding partitions or memory. Yong -- From: tom...@gmail.com Date: Fri, 28 Aug 2015 13:55:52 -0700 Subject: Re: How to avoid shuffle errors for a large join ? To: ja...@jasonknight.us CC: user@spark.apache.org Yeah, I tried with 10k and 30k and these still failed, will try with more then. Though that is a little disappointing, it only writes ~7TB of shuffle data which shouldn't in theory require more than 1000 reducers on my 10TB memory cluster (~7GB of spill per reducer). I'm now wondering if my shuffle partitions are uneven and I should use a custom partitioner, is there a way to get stats on the partition sizes from Spark ? On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote: I had similar problems to this (reduce side failures for large joins (25bn rows with 9bn)), and found the answer was to further up the spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for me, but your tables look a little denser, so you may want to go even higher. On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote: I'm getting errors like Removing executor with no recent heartbeats Missing an output location for shuffle errors for a large SparkSql join (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to configure the job to avoid them. The initial stage completes fine with some 30k tasks on a cluster with 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then the shuffle stage first waits 30min in the scheduling phase according to the UI, and then dies with the mentioned errors. I can see in the GC logs that the executors reach their memory limits (32g per executor, 2 workers per machine) and can't allocate any more stuff in the heap. Fwiw, the top 10 in the memory use histogram are: num #instances #bytes class name -- 1: 24913959511958700560 scala.collection.immutable.HashMap$HashMap1 2: 251085327 8034730464 scala.Tuple2 3: 243694737 5848673688 java.lang.Float 4: 231198778 5548770672 java.lang.Integer 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; 6: 72191582 2310130624 scala.collection.immutable.HashMap$HashTrieMap 7: 74114058 1778737392 java.lang.Long 8: 6059103 779203840 [Ljava.lang.Object; 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer 10: 34749 70122104 [B Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): spark.core.connection.ack.wait.timeout 600 spark.executor.heartbeatInterval 60s spark.executor.memory 32g spark.mesos.coarse false spark.network.timeout 600s spark.shuffle.blockTransferService netty spark.shuffle.consolidateFiles true spark.shuffle.file.buffer 1m spark.shuffle.io.maxRetries6 spark.shuffle.manager sort The join is currently configured with spark.sql.shuffle.partitions=1000 but that doesn't seem to help. Would increasing the partitions help ? Is there a formula to determine an approximate partitions number value for a join ? Any help with this job would be appreciated ! cheers, Tom
Re: How to avoid shuffle errors for a large join ?
Yeah, I tried with 10k and 30k and these still failed, will try with more then. Though that is a little disappointing, it only writes ~7TB of shuffle data which shouldn't in theory require more than 1000 reducers on my 10TB memory cluster (~7GB of spill per reducer). I'm now wondering if my shuffle partitions are uneven and I should use a custom partitioner, is there a way to get stats on the partition sizes from Spark ? On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote: I had similar problems to this (reduce side failures for large joins (25bn rows with 9bn)), and found the answer was to further up the spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for me, but your tables look a little denser, so you may want to go even higher. On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote: I'm getting errors like Removing executor with no recent heartbeats Missing an output location for shuffle errors for a large SparkSql join (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to configure the job to avoid them. The initial stage completes fine with some 30k tasks on a cluster with 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then the shuffle stage first waits 30min in the scheduling phase according to the UI, and then dies with the mentioned errors. I can see in the GC logs that the executors reach their memory limits (32g per executor, 2 workers per machine) and can't allocate any more stuff in the heap. Fwiw, the top 10 in the memory use histogram are: num #instances #bytes class name -- 1: 24913959511958700560 scala.collection.immutable.HashMap$HashMap1 2: 251085327 8034730464 scala.Tuple2 3: 243694737 5848673688 java.lang.Float 4: 231198778 5548770672 java.lang.Integer 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; 6: 72191582 2310130624 scala.collection.immutable.HashMap$HashTrieMap 7: 74114058 1778737392 java.lang.Long 8: 6059103 779203840 [Ljava.lang.Object; 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer 10: 34749 70122104 [B Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): spark.core.connection.ack.wait.timeout 600 spark.executor.heartbeatInterval 60s spark.executor.memory 32g spark.mesos.coarse false spark.network.timeout 600s spark.shuffle.blockTransferService netty spark.shuffle.consolidateFiles true spark.shuffle.file.buffer 1m spark.shuffle.io.maxRetries6 spark.shuffle.manager sort The join is currently configured with spark.sql.shuffle.partitions=1000 but that doesn't seem to help. Would increasing the partitions help ? Is there a formula to determine an approximate partitions number value for a join ? Any help with this job would be appreciated ! cheers, Tom
How to avoid shuffle errors for a large join ?
I'm getting errors like Removing executor with no recent heartbeats Missing an output location for shuffle errors for a large SparkSql join (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to configure the job to avoid them. The initial stage completes fine with some 30k tasks on a cluster with 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then the shuffle stage first waits 30min in the scheduling phase according to the UI, and then dies with the mentioned errors. I can see in the GC logs that the executors reach their memory limits (32g per executor, 2 workers per machine) and can't allocate any more stuff in the heap. Fwiw, the top 10 in the memory use histogram are: num #instances #bytes class name -- 1: 24913959511958700560 scala.collection.immutable.HashMap$HashMap1 2: 251085327 8034730464 scala.Tuple2 3: 243694737 5848673688 java.lang.Float 4: 231198778 5548770672 java.lang.Integer 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; 6: 72191582 2310130624 scala.collection.immutable.HashMap$HashTrieMap 7: 74114058 1778737392 java.lang.Long 8: 6059103 779203840 [Ljava.lang.Object; 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer 10: 34749 70122104 [B Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): spark.core.connection.ack.wait.timeout 600 spark.executor.heartbeatInterval 60s spark.executor.memory 32g spark.mesos.coarse false spark.network.timeout 600s spark.shuffle.blockTransferService netty spark.shuffle.consolidateFiles true spark.shuffle.file.buffer 1m spark.shuffle.io.maxRetries6 spark.shuffle.manager sort The join is currently configured with spark.sql.shuffle.partitions=1000 but that doesn't seem to help. Would increasing the partitions help ? Is there a formula to determine an approximate partitions number value for a join ? Any help with this job would be appreciated ! cheers, Tom
Re: Efficient sampling from a Hive table
Using TABLESAMPLE(0.1) is actually way worse. Spark first spends 12 minutes to discover all split files on all hosts (for some reason) before it even starts the job, and then it creates 3.5 million tasks (the partition has ~32k split files). On Wed, Aug 26, 2015 at 9:36 AM, Jörn Franke jornfra...@gmail.com wrote: Have you tried tablesample? You find the exact syntax in the documentation, but it exlxactly does what you want Le mer. 26 août 2015 à 18:12, Thomas Dudziak tom...@gmail.com a écrit : Sorry, I meant without reading from all splits. This is a single partition in the table. On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote: I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from and I don't particularly care which rows. Doing a LIMIT unfortunately results in two stages where the first stage reads the whole table, and the second then performs the limit with a single worker, which is not very efficient. Is there a better way to sample a subset of rows in Spark without, ideally in a single stage without reading all partitions. cheers, Tom
Efficient sampling from a Hive table
I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from and I don't particularly care which rows. Doing a LIMIT unfortunately results in two stages where the first stage reads the whole table, and the second then performs the limit with a single worker, which is not very efficient. Is there a better way to sample a subset of rows in Spark without, ideally in a single stage without reading all partitions. cheers, Tom
Re: Efficient sampling from a Hive table
Sorry, I meant without reading from all splits. This is a single partition in the table. On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote: I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from and I don't particularly care which rows. Doing a LIMIT unfortunately results in two stages where the first stage reads the whole table, and the second then performs the limit with a single worker, which is not very efficient. Is there a better way to sample a subset of rows in Spark without, ideally in a single stage without reading all partitions. cheers, Tom
Re: Wish for 1.4: upper bound on # tasks in Mesos
I'm using fine-grained for a multi-tenant environment which is why I would welcome the limit of tasks per job :) cheers, Tom On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Tom, Are you using the fine-grained or coarse-grained scheduler? For the coarse-grained scheduler, there is a spark.cores.max config setting that will limit the total # of cores it grabs. This was there in earlier versions too. Matei On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote: I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom
Wish for 1.4: upper bound on # tasks in Mesos
I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom
Exception when using CLUSTER BY or ORDER BY
Under certain circumstances that I haven't yet been able to isolate, I get the following error when doing a HQL query using HiveContext (Spark 1.3.1 on Mesos, fine-grained mode). Is this a known problem or should I file a JIRA for it ? org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56) at org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259) at org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
This is still a problem in 1.3. Optional is both used in several shaded classes within Guava (e.g. the Immutable* classes) and itself uses shaded classes (e.g. AbstractIterator). This causes problems in application code. The only reliable way we've found around this is to shade Guava ourselves for application code and thus avoid the problem altogether. cheers, Tom On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
Actually the extraClassPath settings put the extra jars at the end of the classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them at the front. cheers, Tom On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com wrote: Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it depends on), but removing that would break the public API, so... One last thing you could try is do add your newer Guava jar to spark.driver.extraClassPath and spark.executor.extraClassPath. Those settings will place your jars before Spark's in the classpath, so you'd actually be using the newer versions of the conflicting classes everywhere. It does require manually distributing the Guava jar to the same location on all nodes in the cluster, though. If that doesn't work. Thomas's suggestion of shading Guava in your app can be used as a last resort. On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo -- Marcelo
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
I've just been through this exact case with shaded guava in our Mesos setup and that is how it behaves there (with Spark 1.3.1). cheers, Tom On Fri, May 15, 2015 at 12:04 PM, Marcelo Vanzin van...@cloudera.com wrote: On Fri, May 15, 2015 at 11:56 AM, Thomas Dudziak tom...@gmail.com wrote: Actually the extraClassPath settings put the extra jars at the end of the classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them at the front. That's definitely not the case for YARN: https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1013 And it's been like that for as far as I remember. I'm almost sure that's also the case for standalone, at least in master / 1.4, since I touched a lot of that code recently. It would be really weird if those options worked differently from SPARK_CLASSPATH, since they were meant to replace it. On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com wrote: Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it depends on), but removing that would break the public API, so... One last thing you could try is do add your newer Guava jar to spark.driver.extraClassPath and spark.executor.extraClassPath. Those settings will place your jars before Spark's in the classpath, so you'd actually be using the newer versions of the conflicting classes everywhere. It does require manually distributing the Guava jar to the same location on all nodes in the cluster, though. If that doesn't work. Thomas's suggestion of shading Guava in your app can be used as a last resort. On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo -- Marcelo -- Marcelo