Anyone else having trouble with replicated off heap RDD persistence?
Hello, A coworker was having a problem with a big Spark job failing after several hours when one of the executors would segfault. That problem aside, I speculated that her job would be more robust against these kinds of executor crashes if she used replicated RDD storage. She's using off heap storage (for good reason), so I asked her to try running her job with the following storage level: `StorageLevel(useDisk = true, useMemory = true, useOffHeap = true, deserialized = false, replication = 2)`. The job would immediately fail with a rather suspicious looking exception. For example: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9086 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) or java.lang.IndexOutOfBoundsException: Index: 6, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executo
Re: External JARs not loading Spark Shell Scala 2.11
I actually just saw your comment on SPARK-6989 before this message. So I'll copy to the mailing list: I'm not sure I understand what you mean about running on 2.11.6. I'm just running the spark-shell command. It in turn is running java -cp /opt/spark/conf:/opt/spark/lib/spark-assembly-1.3.2-SNAPSHOT-hadoop2.5.0-cdh5.3.3.jar:/etc/hadoop/conf:/opt/spark/lib/jline-2.12.jar \ -Dscala.usejavacp=true -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main spark-shell I built Spark with the included build/mvn script. As far as I can tell, the only reference to a specific version of Scala is in the top-level pom file, and it says 2.11.2. > On Apr 17, 2015, at 9:57 PM, Sean Owen wrote: > > You are running on 2.11.6, right? of course, it seems like that should > all work, but it doesn't work for you. My point is that the shell you > are saying doesn't work is Scala's 2.11.2 shell -- with some light > modification. > > It's possible that the delta is the problem. I can't entirely make out > whether the errors are Spark-specific; they involve Spark classes in > some cases but they're assertion errors from Scala libraries. > > I don't know if this shell is supposed to work even across maintenance > releases as-is, though that would be very nice. It's not an "API" for > Scala. > > A good test of whether this idea has any merit would be to run with > Scala 2.11.2. I'll copy this to JIRA for continuation. > > On Fri, Apr 17, 2015 at 10:31 PM, Michael Allman wrote: >> H... I don't follow. The 2.11.x series is supposed to be binary >> compatible against user code. Anyway, I was building Spark against 2.11.2 >> and still saw the problems with the REPL. I've created a bug report: >> >> https://issues.apache.org/jira/browse/SPARK-6989 >> >> I hope this helps. >> >> Cheers, >> >> Michael >> >> On Apr 17, 2015, at 1:41 AM, Sean Owen wrote: >> >> Doesn't this reduce to "Scala isn't compatible with itself across >> maintenance releases"? Meaning, if this were "fixed" then Scala >> 2.11.{x < 6} would have similar failures. It's not not-ready; it's >> just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the >> unofficial support to at least make the latest Scala 2.11 the unbroken >> one. >> >> On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman >> wrote: >> >> FWIW, this is an essential feature to our use of Spark, and I'm surprised >> it's not advertised clearly as a limitation in the documentation. All I've >> found about running Spark 1.3 on 2.11 is here: >> >> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 >> >> Also, I'm experiencing some serious stability problems simply trying to run >> the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a >> torrent of compiler assertion failures, etc. See attached. >> >> >> >> Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready >> for production use. I was going to file a bug, but it seems clear that the >> current implementation is going to need to be forward-ported to Scala 2.11.6 >> anyway. We already have an issue for that: >> >> https://issues.apache.org/jira/browse/SPARK-6155 >> >> Michael >> >> >> On Apr 9, 2015, at 10:29 PM, Prashant Sharma wrote: >> >> You will have to go to this commit ID >> 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are >> at that commit, you need to review the changes done to the repl code and >> look for the relevant occurrences of the same code in scala 2.11 repl source >> and somehow make it all work. >> >> >> Thanks, >> >> >> >> >> >> 1. http://githowto.com/getting_old_versions >> >> Prashant Sharma >> >> >> >> On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos wrote: >> >> >> Ok, what do i need to do in order to migrate the patch? >> >> Thanks >> Alex >> >> On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma >> wrote: >> >> >> This is the jira I referred to >> https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not >> working on it is evaluating priority between upgrading to scala 2.11.5(it is >> non trivial I suppose because repl has changed a bit) or migrating that >> patch is much simpler. >> >> Prashant Sharma >> >&
Re: External JARs not loading Spark Shell Scala 2.11
H... I don't follow. The 2.11.x series is supposed to be binary compatible against user code. Anyway, I was building Spark against 2.11.2 and still saw the problems with the REPL. I've created a bug report: https://issues.apache.org/jira/browse/SPARK-6989 <https://issues.apache.org/jira/browse/SPARK-6989> I hope this helps. Cheers, Michael > On Apr 17, 2015, at 1:41 AM, Sean Owen wrote: > > Doesn't this reduce to "Scala isn't compatible with itself across > maintenance releases"? Meaning, if this were "fixed" then Scala > 2.11.{x < 6} would have similar failures. It's not not-ready; it's > just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the > unofficial support to at least make the latest Scala 2.11 the unbroken > one. > > On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman wrote: >> FWIW, this is an essential feature to our use of Spark, and I'm surprised >> it's not advertised clearly as a limitation in the documentation. All I've >> found about running Spark 1.3 on 2.11 is here: >> >> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 >> >> Also, I'm experiencing some serious stability problems simply trying to run >> the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a >> torrent of compiler assertion failures, etc. See attached. >> >> >> >> Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready >> for production use. I was going to file a bug, but it seems clear that the >> current implementation is going to need to be forward-ported to Scala 2.11.6 >> anyway. We already have an issue for that: >> >> https://issues.apache.org/jira/browse/SPARK-6155 >> >> Michael >> >> >> On Apr 9, 2015, at 10:29 PM, Prashant Sharma wrote: >> >> You will have to go to this commit ID >> 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are >> at that commit, you need to review the changes done to the repl code and >> look for the relevant occurrences of the same code in scala 2.11 repl source >> and somehow make it all work. >> >> >> Thanks, >> >> >> >> >> >> 1. http://githowto.com/getting_old_versions >> >> Prashant Sharma >> >> >> >> On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos wrote: >>> >>> Ok, what do i need to do in order to migrate the patch? >>> >>> Thanks >>> Alex >>> >>> On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma >>> wrote: >>>> >>>> This is the jira I referred to >>>> https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not >>>> working on it is evaluating priority between upgrading to scala 2.11.5(it >>>> is >>>> non trivial I suppose because repl has changed a bit) or migrating that >>>> patch is much simpler. >>>> >>>> Prashant Sharma >>>> >>>> >>>> >>>> On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos wrote: >>>>> >>>>> Hi- >>>>> >>>>> Was this the JIRA issue? >>>>> https://issues.apache.org/jira/browse/SPARK-2988 >>>>> >>>>> Any help in getting this working would be much appreciated! >>>>> >>>>> Thanks >>>>> Alex >>>>> >>>>> On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma >>>>> wrote: >>>>>> >>>>>> You are right this needs to be done. I can work on it soon, I was not >>>>>> sure if there is any one even using scala 2.11 spark repl. Actually >>>>>> there is >>>>>> a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), >>>>>> which >>>>>> has to be ported for scala 2.11 too. If however, you(or anyone else) are >>>>>> planning to work, I can help you ? >>>>>> >>>>>> Prashant Sharma >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Apr 9, 2015 at 3:08 PM, anakos wrote: >>>>>>> >>>>>>> Hi- >>>>>>> >>>>>>> I am having difficulty getting the 1.3.0 Spark shell to find an >>>>>>> external >>>>>>> jar. I have build Spark locally for Scala 2.11 and I am starting the
Re: External JARs not loading Spark Shell Scala 2.11
FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here:http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached.spark@dp-cluster-master-node-001:~/spark/bin$ spark-shell Spark Command: java -cp /opt/spark/conf:/opt/spark/lib/spark-assembly-1.3.2-SNAPSHOT-hadoop2.5.0-cdh5.3.3.jar:/etc/hadoop/conf:/opt/spark/lib/jline-2.12.jar -Dscala.usejavacp=true -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.11.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. Exception in thread "main" java.lang.AssertionError: assertion failed: parser: (source: String, options: Map[String,String])org.apache.spark.sql.DataFrame, tailcalls: (source: String, options: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame, tailcalls: (source: String, options: scala.collection.immutable.Map)org.apache.spark.sql.DataFrame at scala.reflect.internal.Symbols$TypeHistory.(Symbols.scala:3601) at scala.reflect.internal.Symbols$Symbol.rawInfo(Symbols.scala:1521) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1439) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23$$anonfun$apply$20.apply(SpecializeTypes.scala:775) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23$$anonfun$apply$20.apply(SpecializeTypes.scala:768) at scala.collection.immutable.List.flatMap(List.scala:327) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23.apply(SpecializeTypes.scala:768) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23.apply(SpecializeTypes.scala:766) at scala.collection.immutable.List.flatMap(List.scala:327) at scala.tools.nsc.transform.SpecializeTypes.specializeClass(SpecializeTypes.scala:766) at scala.tools.nsc.transform.SpecializeTypes.transformInfo(SpecializeTypes.scala:1187) at scala.tools.nsc.transform.InfoTransform$Phase$$anon$1.transform(InfoTransform.scala:38) at scala.reflect.internal.Symbols$Symbol.rawInfo(Symbols.scala:1519) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1439) at scala.reflect.internal.Symbols$Symbol.isDerivedValueClass(Symbols.scala:775) at scala.reflect.internal.transform.Erasure$ErasureMap.apply(Erasure.scala:131) at scala.reflect.internal.transform.Erasure$ErasureMap.apply(Erasure.scala:144) at scala.reflect.internal.transform.Erasure$class.specialErasure(Erasure.scala:209) at scala.tools.nsc.transform.Erasure.specialErasure(Erasure.scala:15) at scala.reflect.internal.transform.Erasure$class.transformInfo(Erasure.scala:364) at scala.tools.nsc.transform.Erasure.transformInfo(Erasure.scala:348) at scala.tools.nsc.transform.InfoTransform$Phase$$anon$1.transform(InfoTransform.scala:38) at scala.reflect.internal.Symbols$Symbol.rawInfo(Symbols.scala:1519) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1439) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1$$anonfun$apply$mcV$sp$2.apply(Erasure.scala:753) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1$$anonfun$apply$mcV$sp$2.apply(Erasure.scala:753) at scala.reflect.internal.Scopes$Scope.foreach(Scopes.scala:373) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1.apply(Erasure.scala:753) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1.apply(Erasure.scala:753) at scala.reflect.internal.SymbolTable.enteringPhase(SymbolTable.scala:235) at scala.reflect.internal.SymbolTable.exitingPhase(SymbolTable.scala:256) at scala.tools.nsc.transform.Erasure$ErasureTransformer.checkNoDeclaredDoubleDefs(Erasure.scala:753) at scala.tools.nsc.transform.Erasure$ErasureTransformer.scala$tools$nsc$transform$Erasure$ErasureTransformer$$checkNoDoubleDefs(Erasure.scala:780) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anon$1.preErase(Erasure.scala:1074) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anon$1.transform(Erasure.scala:1109) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anon$1.transform(Erasure.scala:841) at scala.reflect.api.Trees$Transformer.transformTemplate(Trees.scala:2563) at scala.reflect.internal.Trees$$anonfun$itransform$4.apply(Trees.scala:1401) at scala.reflect.internal.Trees$$anonfun$itransform$4.apply(Trees.scala:1400) at scala
independent user sessions with a multi-user spark sql thriftserver (Spark 1.1)
Hello, We're running a spark sql thriftserver that several users connect to with beeline. One limitation we've run into is that the current working database (set with "use ") is shared across all connections. So changing the database on one connection changes the database for all connections. This is also the case for spark sql settings, but that's less of an issue. Is there a way (or a hack) to make the current database selection independent for each beeline connection? I'm not afraid to hack into the source code if there's a straightforward fix/workaround, but I could use some guidance on what to hack on if that's required. Thank you! Michael - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SQL] Set Parquet block size?
Hi Pierre, I'm setting parquet (and hdfs) block size like follows: val ONE_GB = 1024 * 1024 * 1024 sc.hadoopConfiguration.setInt("dfs.blocksize", ONE_GB) sc.hadoopConfiguration.setInt("parquet.block.size", ONE_GB) Here, sc is a reference to the spark context. I've tested this and it works for me. Hopefully this helps resolve your memory issue. Good luck! Michael On Oct 9, 2014, at 8:43 AM, Pierre B wrote: > Hi there! > > Is there a way to modify default parquet block size? > > I didn't see any reference to ParquetOutputFormat.setBlockSize in Spark code > so I was wondering if there was a way to provide this option? > > I'm asking because we are facing Out of Memory issues when writing parquet > files. > The rdd we are saving to parquet have a fairly high number of columns (in > the thousands, around 3k for the moment). > > The only way we can get rid of this for the moment is by doing a .coalesce > on the SchemaRDD before saving to parquet, but as we get more columns, even > this approach is not working. > > Any help is appreciated! > > Thanks > > Pierre > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Set-Parquet-block-size-tp16039.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
Re: Support for Parquet V2 in ParquetTableSupport?
I am planning to try upgrading spark sql to a newer version of parquet, too. I'll let you know if I make progress. Thanks, Michael On Oct 8, 2014, at 12:17 PM, Michael Armbrust wrote: > Thats a good question, I'm not sure if that will work. I will note that we > are hoping to do some upgrades of our parquet support in the near future. > > On Tue, Oct 7, 2014 at 10:33 PM, Michael Allman wrote: > Hello, > > I was interested in testing Parquet V2 with Spark SQL, but noticed after some > investigation that the parquet writer that Spark SQL uses is fixed at V1 > here: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala#L350. > Any particular reason Spark SQL is hard-coded to write Parquet V1? Should I > expect trouble if I write Parquet V2? > > Cheers, > > Michael > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Interactive interface tool for spark
Ummm... what's helium? Link, plz? On Oct 8, 2014, at 9:13 AM, Stephen Boesch wrote: > @kevin, Michael, > Second that: interested in seeing the zeppelin. pls use helium though .. > > 2014-10-08 7:57 GMT-07:00 Michael Allman : > Hi Andy, > > This sounds awesome. Please keep us posted. Meanwhile, can you share a link > to your project? I wasn't able to find it. > > Cheers, > > Michael > > On Oct 8, 2014, at 3:38 AM, andy petrella wrote: > >> Heya >> >> You can check Zeppellin or my fork of the Scala notebook. >> I'm going this week end to push some efforts on the doc, because it supports >> for realtime graphing, Scala, SQL, dynamic loading of dependencies and I >> started this morning a widget to track the progress of the jobs. >> I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the >> Cassandra connector so far. >> However, its major drawback is that it is a one man (best) effort ftm! :-S >> Le 8 oct. 2014 11:16, "Dai, Kevin" a écrit : >> Hi, All >> >> >> >> We need an interactive interface tool for spark in which we can run spark >> job and plot graph to explorer the data interactively. >> >> Ipython notebook is good, but it only support python (we want one supporting >> scala)… >> >> >> >> BR, >> >> Kevin. >> >> >> >> >> > >
Re: Interactive interface tool for spark
Hi Andy, This sounds awesome. Please keep us posted. Meanwhile, can you share a link to your project? I wasn't able to find it. Cheers, Michael On Oct 8, 2014, at 3:38 AM, andy petrella wrote: > Heya > > You can check Zeppellin or my fork of the Scala notebook. > I'm going this week end to push some efforts on the doc, because it supports > for realtime graphing, Scala, SQL, dynamic loading of dependencies and I > started this morning a widget to track the progress of the jobs. > I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the > Cassandra connector so far. > However, its major drawback is that it is a one man (best) effort ftm! :-S > Le 8 oct. 2014 11:16, "Dai, Kevin" a écrit : > Hi, All > > > > We need an interactive interface tool for spark in which we can run spark job > and plot graph to explorer the data interactively. > > Ipython notebook is good, but it only support python (we want one supporting > scala)… > > > > BR, > > Kevin. > > > > >
Re: window every n elements instead of time based
Yes, I meant batch interval. Thanks for clarifying. Cheers, Michael On Oct 7, 2014, at 11:14 PM, jayant [via Apache Spark User List] wrote: > Hi Michael, > > I think you are meaning batch interval instead of windowing. It can be > helpful for cases when you do not want to process very small batch sizes. > > HDFS sink in Flume has the concept of rolling files based on time, number of > events or size. > https://flume.apache.org/FlumeUserGuide.html#hdfs-sink > > The same could be applied to Spark if the use cases demand. The only major > catch would be that it breaks the concept of window operations which are in > Spark. > > Thanks, > Jayant > > > > > On Tue, Oct 7, 2014 at 10:19 PM, Michael Allman <[hidden email]> wrote: > Hi Andrew, > > The use case I have in mind is batch data serialization to HDFS, where sizing > files to a certain HDFS block size is desired. In my particular use case, I > want to process 10GB batches of data at a time. I'm not sure this is a > sensible use case for spark streaming, and I was trying to test it. However, > I had trouble getting it working and in the end I decided it was more trouble > than it was worth. So I decided to split my task into two: one streaming job > on small, time-defined batches of data, and a traditional Spark job > aggregating the smaller files into a larger whole. In retrospect, I think > this is the right way to go, even if a count-based window specification was > possible. Therefore, I can't suggest my use case for a count-based window > size. > > Cheers, > > Michael > > On Oct 5, 2014, at 4:03 PM, Andrew Ash <[hidden email]> wrote: > >> Hi Michael, >> >> I couldn't find anything in Jira for it -- >> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22window%22%20AND%20component%20%3D%20Streaming >> >> Could you or Adrian please file a Jira ticket explaining the functionality >> and maybe a proposed API? This will help people interested in count-based >> windowing to understand the state of the feature in Spark Streaming. >> >> Thanks! >> Andrew >> >> On Fri, Oct 3, 2014 at 4:09 PM, Michael Allman <[hidden email]> wrote: >> Hi, >> >> I also have a use for count-based windowing. I'd like to process data >> batches by size as opposed to time. Is this feature on the development >> roadmap? Is there a JIRA ticket for it? >> >> Thank you, >> >> Michael >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15701.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: [hidden email] >> For additional commands, e-mail: [hidden email] >> >> > > > > > If you reply to this email, your message will be added to the discussion > below: > http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15904.html > To unsubscribe from window every n elements instead of time based, click here. > NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15905.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Support for Parquet V2 in ParquetTableSupport?
Hello, I was interested in testing Parquet V2 with Spark SQL, but noticed after some investigation that the parquet writer that Spark SQL uses is fixed at V1 here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala#L350. Any particular reason Spark SQL is hard-coded to write Parquet V1? Should I expect trouble if I write Parquet V2? Cheers, Michael - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: window every n elements instead of time based
Hi Andrew, The use case I have in mind is batch data serialization to HDFS, where sizing files to a certain HDFS block size is desired. In my particular use case, I want to process 10GB batches of data at a time. I'm not sure this is a sensible use case for spark streaming, and I was trying to test it. However, I had trouble getting it working and in the end I decided it was more trouble than it was worth. So I decided to split my task into two: one streaming job on small, time-defined batches of data, and a traditional Spark job aggregating the smaller files into a larger whole. In retrospect, I think this is the right way to go, even if a count-based window specification was possible. Therefore, I can't suggest my use case for a count-based window size. Cheers, Michael On Oct 5, 2014, at 4:03 PM, Andrew Ash wrote: > Hi Michael, > > I couldn't find anything in Jira for it -- > https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22window%22%20AND%20component%20%3D%20Streaming > > Could you or Adrian please file a Jira ticket explaining the functionality > and maybe a proposed API? This will help people interested in count-based > windowing to understand the state of the feature in Spark Streaming. > > Thanks! > Andrew > > On Fri, Oct 3, 2014 at 4:09 PM, Michael Allman wrote: > Hi, > > I also have a use for count-based windowing. I'd like to process data > batches by size as opposed to time. Is this feature on the development > roadmap? Is there a JIRA ticket for it? > > Thank you, > > Michael > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15701.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: window every n elements instead of time based
Hi, I also have a use for count-based windowing. I'd like to process data batches by size as opposed to time. Is this feature on the development roadmap? Is there a JIRA ticket for it? Thank you, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: possible bug in Spark's ALS implementation...
Hi Nick, I don't have my spark clone in front of me, but OTOH the major differences are/were: 1. Oryx multiplies lambda by alpha. 2. Oryx uses a different matrix inverse algorithm. It maintains a certain symmetry which the Spark algo does not, however I don't think this difference has a real impact on the results. 3. Oryx supports the specification of a convergence threshold for termination of the algorithm, based on delta rmse on a subset of the training set if I recall correctly. I've been using that as the termination criterion instead of a fixed number of iterations. 4. Oryx uses the weighted regularization scheme you alluded to below, multiplying lambda by the number of ratings. I've patched the spark impl to support (4) but haven't pushed it to my clone on github. I think it would be a valuable feature to support officially. I'd also like to work on (3) but don't have time now. I've only been using Oryx the past couple of weeks. Cheers, Michael On Tue, 1 Apr 2014, Nick Pentreath [via Apache Spark User List] wrote: > Hi Michael > Would you mind setting out exactly what differences you did find between the > Spark and Oryx implementations? Would be good to be clear on them, and also > see if there are further tricks/enhancements from the Oryx one that can be > ported (such as the lambda * numRatings adjustment). > > N > > > On Sat, Mar 15, 2014 at 2:52 AM, Michael Allman <[hidden email]> wrote: > I've been thoroughly investigating this issue over the past > couple of days > and have discovered quite a bit. For one thing, there is > definitely (at > least) one issue/bug in the Spark implementation that leads to > incorrect > results for models generated with rank > 1 or a large number of > iterations. > I will post a bug report with a thorough explanation this > weekend or on > Monday. > > I believe I've been able to track down every difference between > the Spark > and Oryx implementations that lead to difference results. I made > some > adjustments to the spark implementation so that, given the same > initial > product/item vectors, the resulting model is identical to the > one produced > by Oryx within a small numerical tolerance. I've verified this > for small > data sets and am working on verifying this with some large data > sets. > > Aside from those already identified in this thread, another > significant > difference in the Spark implementation is that it begins the > factorization > process by computing the product matrix (Y) from the initial > user matrix > (X). Both of the papers on ALS referred to in this thread begin > the process > by computing the user matrix. I haven't done any testing > comparing the > models generated starting from Y or X, but they are very > different. Is there > a reason Spark begins the iteration by computing Y? > > Initializing both X and Y as is done in the Spark implementation > seems > unnecessary unless I'm overlooking some desired side-effect. > Only the factor > matrix which generates the other in the first iteration needs to > be > initialized. > > I also found that the product and user RDDs were being rebuilt > many times > over in my tests, even for tiny data sets. By persisting the RDD > returned > from updateFeatures() I was able to avoid a raft of duplicate > computations. > Is there a reason not to do this? > > Thanks. > > > > -- > View this message in > context:http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s > -ALS-implementation-tp2567p2704.html > Sent from the Apache Spark User List mailing list archive at > Nabble.com. > > > > If you reply to this email, your message will be added to the discussion > below: > http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s > -ALS-implementation-tp2567p3588.html > To unsubscribe from possible bug in Spark's ALS implementation..., click > here. NAML > > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p3619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
I just ran a runtime performance comparison between 0.9.0-incubating and your als branch. I saw a 1.5x improvement in performance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2823.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
Hi Xiangrui, I don't see how https://github.com/apache/spark/pull/161 relates to ALS. Can you explain? Also, thanks for addressing the issue with factor matrix persistence in PR 165. I was probably not going to get to that for a while. I will try to test your changes today for speed improvements. Cheers, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2817.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
I've created https://spark-project.atlassian.net/browse/SPARK-1263 to address the issue of the factor matrix recomputation. I'm planning to submit a related pull request shortly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2785.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
You are correct, in the long run it doesn't matter which matrix you begin the iterative process with. I was thinking in terms of doing a side-by-side comparison to Oryx. I've posted a bug report as SPARK-1262. I described the problem I found and the mitigation strategy I've used. I think that this problem has many possible solutions, so I'm omitting a patch to let the community hash out the best approach. However, I will suggest we move to a pure Java implementation of a linear system solver to provide better assurances of correctness across platforms (differences in java.lang.Math notwithstanding) and to make the implementation more transparent. It is not clear exactly what native code JBlas is linked to and using for its solver. I suggested the QR decomposition-based solvers provided by Colt and Commons Math as candidate replacements. Cheers. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2783.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
I've been thoroughly investigating this issue over the past couple of days and have discovered quite a bit. For one thing, there is definitely (at least) one issue/bug in the Spark implementation that leads to incorrect results for models generated with rank > 1 or a large number of iterations. I will post a bug report with a thorough explanation this weekend or on Monday. I believe I've been able to track down every difference between the Spark and Oryx implementations that lead to difference results. I made some adjustments to the spark implementation so that, given the same initial product/item vectors, the resulting model is identical to the one produced by Oryx within a small numerical tolerance. I've verified this for small data sets and am working on verifying this with some large data sets. Aside from those already identified in this thread, another significant difference in the Spark implementation is that it begins the factorization process by computing the product matrix (Y) from the initial user matrix (X). Both of the papers on ALS referred to in this thread begin the process by computing the user matrix. I haven't done any testing comparing the models generated starting from Y or X, but they are very different. Is there a reason Spark begins the iteration by computing Y? Initializing both X and Y as is done in the Spark implementation seems unnecessary unless I'm overlooking some desired side-effect. Only the factor matrix which generates the other in the first iteration needs to be initialized. I also found that the product and user RDDs were being rebuilt many times over in my tests, even for tiny data sets. By persisting the RDD returned from updateFeatures() I was able to avoid a raft of duplicate computations. Is there a reason not to do this? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2704.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
Hi Sean, Digging deeper I've found another difference between Oryx's implementation and Spark's. Why do you adjust lambda here? https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java#L491 Cheers, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2636.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
Thank you everyone for your feedback. It's been very helpful, and though I still haven't found the cause of the difference between Spark and Oryx, I feel I'm making progress. Xiangrui asked me to create a ticket for this issue. The reason I didn't do this originally is because it's not clear to me yet that this is a bug or a mistake on my part. I'd like to see where this conversation goes and then file a more clearcut issue if applicable. Sean pointed out that Oryx differs in its use of the regularization parameter lambda. I'm aware of this and have been compensating for this difference from the start. Also, the handling of negative values is indeed irrelevant as I have none in my data. After reviewing Sean's analysis and running some calculations in the console, I agree that the Spark code does compute YtCuY correctly. Regarding testing, I'm computing EPR on a test set as outlined in the paper. I'm training on three weeks of data and testing on the following week. I recently updated my data sets and rebuilt and tested the new models. The results were inconclusive in that both models scored about the same. I'm continuing to investigate the source of the wide difference in recommendations between implementations. I will reply with my findings when I have something more definitive. Cheers and thanks again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2632.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
possible bug in Spark's ALS implementation...
Hi, I'm implementing a recommender based on the algorithm described in http://www2.research.att.com/~yifanhu/PUB/cf.pdf. This algorithm forms the basis for Spark's ALS implementation for data sets with implicit features. The data set I'm working with is proprietary and I cannot share it, however I can say that it's based on the same kind of data in the paper---relative viewing time of videos. (Specifically, the "rating" for each video is defined as total viewing time across all visitors divided by video duration). I'm seeing counterintuitive, sometimes nonsensical recommendations. For comparison, I've run the training data through Oryx's in-VM implementation of implicit ALS with the same parameters. Oryx uses the same algorithm. (Source in this file: https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java) The recommendations made by each system compared to one other are very different---moreso than I think could be explained by differences in initial state. The recommendations made by the Oryx models look much better, especially as I increase the number of latent factors and the iterations. The Spark models' recommendations don't improve with increases in either latent factors or iterations. Sometimes, they get worse. Because of the (understandably) highly-optimized and terse style of Spark's ALS implementation, I've had a very hard time following it well enough to debug the issue definitively. However, I have found a section of code that looks incorrect. As described in the paper, part of the implicit ALS algorithm involves computing a matrix product YtCuY (equation 4 in the paper). To optimize this computation, this expression is rewritten as YtY + Yt(Cu - I)Y. I believe that's what should be happening here: https://github.com/apache/incubator-spark/blob/v0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L376 However, it looks like this code is in fact computing YtY + YtY(Cu - I), which is the same as YtYCu. If so, that's a bug. Can someone familiar with this code evaluate my claim? Cheers, Michael
is spark.cleaner.ttl safe?
Hello, I've been trying to run an iterative spark job that spills 1+ GB to disk per iteration on a system with limited disk space. I believe there's enough space if spark would clean up unused data from previous iterations, but as it stands the number of iterations I can run is limited by available disk space. I found a thread on the usage of spark.cleaner.ttl on the old Spark Users Google group here: https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4 I think this setting may be what I'm looking for, however the cleaner seems to delete data that's still in use. The effect is I get bizarre exceptions from Spark complaining about missing broadcast data or ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it supposed to delete in-use data or is this a bug/shortcoming? Cheers, Michael