Error running sbt package on Windows 7 for Spark 1.3.1 and SimpleApp.scala
Hi all, I'm trying to run the standalone application SimpleApp.scala following the instructions on the http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala I was able to create a .jar file by doing sbt package. However when I tried to do $ YOUR_SPARK_HOME/bin/spark-submit --class SimpleApp --master local[4] c:/myproject/target/scala-2.10/simple-project_2.10-1.0.jar I didn't get the desired result. There is a lot of output, but a few areas, said ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) [image: Inline image 2] Furthermore, trying sbt run and sbt compile from the myproject folder gives this error: [image: Inline image 1] Any ideas?
SparkSQL : using Hive UDF returning Map throws rror: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0)
Hello, I tested some custom udf on SparkSql's ThriftServer Beeline (Spark 1.3.1). Some udfs work fine (access array parameter and returning int or string type). But my udf returning map type throws an error: Error: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0) I converted the code into Hive's GenericUDF since I worried that using complex type parameter (array of map) and returning complex type (map) can be supported in Hive's GenericUDF instead of simple UDF. But SparkSQL doesn't seem supporting GenericUDF.(error message : Error: java.lang.IllegalAccessException: Class org.apache.spark.sql.hive.HiveFunctionWrapper can not access ..). Below is my example udf code returning MAP type. I appreciate any advice. Thanks -- public final class ArrayToMap extends UDF { public MapString,String evaluate(ArrayListString arrayOfString) { // add code to handle all index problem MapString, String map = new HashMapString, String(); int count = 0; for (String element : arrayOfString) { map.put(count + , element); count++; } return map; } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-using-Hive-UDF-returning-Map-throws-rror-scala-MatchError-interface-java-util-Map-of-class--tp23164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark terminology
- I see these in my mapper only task. - - *Input Size / Records: *68.0 GB / 577195178 - *Shuffle write: *95.1 GB / 282559291 - *Shuffle spill (memory): *2.8 TB - *Shuffle spill (disk): *90.3 GB I understand the first one, can someone give 1/2 liners for the next three ? also tell if these numbers are good/bad ? -- Deepak
RE: SparkSQL : using Hive UDF returning Map throws rror: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0)
Which version of Hive jar are you using? Hive 0.13.1 or Hive 0.12.0? -Original Message- From: ogoh [mailto:oke...@gmail.com] Sent: Friday, June 5, 2015 10:10 AM To: user@spark.apache.org Subject: SparkSQL : using Hive UDF returning Map throws rror: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0) Hello, I tested some custom udf on SparkSql's ThriftServer Beeline (Spark 1.3.1). Some udfs work fine (access array parameter and returning int or string type). But my udf returning map type throws an error: Error: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0) I converted the code into Hive's GenericUDF since I worried that using complex type parameter (array of map) and returning complex type (map) can be supported in Hive's GenericUDF instead of simple UDF. But SparkSQL doesn't seem supporting GenericUDF.(error message : Error: java.lang.IllegalAccessException: Class org.apache.spark.sql.hive.HiveFunctionWrapper can not access ..). Below is my example udf code returning MAP type. I appreciate any advice. Thanks -- public final class ArrayToMap extends UDF { public MapString,String evaluate(ArrayListString arrayOfString) { // add code to handle all index problem MapString, String map = new HashMapString, String(); int count = 0; for (String element : arrayOfString) { map.put(count + , element); count++; } return map; } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-using-Hive-UDF-returning-Map-throws-rror-scala-MatchError-interface-java-util-Map-of-class--tp23164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: TF-IDF Question
Hi, org.apache.spark.mllib.linalg.Vector = (1048576,[35587,884670],[3.458767233,3.458767233]) it is sparse vector representation of terms so the first term(1048576) is the length of vector [35587,884670] is the index of words [3.458767233,3.458767233] are the tf-idf values of the terms. Thanks Somnath From: franco barrientos [mailto:franco.barrien...@exalitica.com] Sent: Thursday, June 04, 2015 11:17 PM To: user@spark.apache.org Subject: TF-IDF Question Hi all!, I have a .txt file where each row of it it's a collection of terms of a document separated by space. For example: 1 Hola spark 2 .. I followed this example of spark site https://spark.apache.org/docs/latest/mllib-feature-extraction.html and i get something like this: tfidf.first() org.apache.spark.mllib.linalg.Vector = (1048576,[35587,884670],[3.458767233,3.458767233]) I think this: 1. First parameter 1048576 i don't know what it is but always it´s the same number (maybe the number of terms). 2. Second parameter [35587,884670] i think are the terms of the first line in my .txt file. 3. Third parameter [3.458767233,3.458767233] i think are the tfidf values for my terms. Anyone knows the exact interpretation of this and in the second point if these values are the terms, how can i match this values with the original terms values ([35587=Hola,884670=spark])?. Regards and thanks in advance. Franco Barrientos Data Scientist Málaga #115, Of. 1003, Las Condes. Santiago, Chile. (+562)-29699649 (+569)-76347893 franco.barrien...@exalitica.commailto:franco.barrien...@exalitica.com www.exalitica.com http://www.exalitica.com/ [http://exalitica.com/web/img/frim.png]
FetchFailed Exception
I see this Is this a problem with my code or the cluster ? Is there any way to fix it ? FetchFailed(BlockManagerId(2, phxdpehdc9dn2441.stratus.phx.ebay.com, 59574), shuffleId=1, mapId=80, reduceId=20, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) ... 3 more Caused by: java.net.ConnectException: Connection refused: phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more ) -- Deepak
Why the default Params.copy doesn't work for Model.copy?
Hello, I have a question with Spark 1.4 ml library. In the copy function, it is stated that the default implementation doesn't work of Params doesn't work for models. ( https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Model.scala#L49 ) As a result, some feature generation transformer like StringIndexerModel cannot be used in Pipeline. Maybe due to my limited knowledge in ML pipeline, can anyone give me some hints why Model.copy behaves differently as other Params? Thanks! Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-default-Params-copy-doesn-t-work-for-Model-copy-tp23169.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Column operation on Spark RDDs.
Hi, I have a RDD with MANY columns (e.g., hundreds), and most of my operation is on columns, e.g., I need to create many intermediate variables from different columns, what is the most efficient way to do this? For example, if my dataRDD[Array[String]] is like below: 123, 523, 534, ..., 893 536, 98, 1623, ..., 98472 537, 89, 83640, ..., 9265 7297, 98364, 9, ..., 735 .. 29, 94, 956, ..., 758 I will need to create a new column or a variable as newCol1 = 2ndCol+19thCol, and another new column based on newCol1 and the existing columns: newCol2 = function(newCol1, 34thCol), what is the best way of doing this? I have been thinking using index for the intermediate variables and the dataRDD, and then join them together on the index to do my calculation: var dataRDD = sc.textFile(/test.csv).map(_.split(,)) val dt = dataRDD.zipWithIndex.map(_.swap) val newCol1 = dataRDD.map(x = x(1)+x(18)).zipWithIndex.map(_.swap) val newCol2 = newCol1.join(dt).map(x= function(.)) Is there a better way of doing this? Thank you very much! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Column-operation-on-Spark-RDDs-tp23165.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Deduping events using Spark
Hi Lee, You should be able to create a PairRDD using the Nonce as the key, and the AnalyticsEvent as the value. I'm very new to Spark, but here is some uncompilable pseudo code that may or may not help: events.map(event = (event.getNonce, event)).reduceByKey((a, b) = a).map(_._2) The above code is more Scala-like, since that's the syntax with which I have more familiarity - it looks like the Spark Java 8 API is similar, but you won't get implicit conversion to a PairRDD when you use a 2-Tuple as the mapped value. Instead, will need to use the mapToPair function - there's a good example in the Spark Programming Guide under Working With Key-Value Pairs https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs . Hope this helps! Regards, Will On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote: I'm still a bit new to Spark and am struggilng to figure out the best way to Dedupe my events. I load my Avro files from HDFS and then I want to dedupe events that have the same nonce. For example my code so far: JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) Now I want to get back an RDD of AnalyticsEvents that are unique. So I basically want to do: if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of them. I'm not sure how to do this? If I do reduceByKey it reduces by AnalyticsEvent not by the values inside? Any guidance would be much appreciated how I can walk this list of events and only return a filtered version of unique nocnes. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compute Median in Spark Dataframe
My current example doesn't use a Hive UDAF, but you would do something pretty similar (it calls a new user defined UDAF, and there are wrappers to make Spark SQL UDAFs from Hive UDAFs but they are private). So this is doable, but since it pokes at internals it will likely break between versions of Spark. If you want to see the WIP PR I have with Sparkling Pandas its at https://github.com/sparklingpandas/sparklingpandas/pull/90/files . If your doing this in JVM and just want to know how to wrap the Hive UDAF, you can grep/look in sql/hive/ in Spark, but I'd encourage you to see if there is another way to accomplish what you want (since poking at the internals is kind of dangerous). On Thu, Jun 4, 2015 at 6:28 AM, Deenar Toraskar deenar.toras...@gmail.com wrote: Hi Holden, Olivier So for column you need to pass in a Java function, I have some sample code which does this but it does terrible things to access Spark internals. I also need to call a Hive UDAF in a dataframe agg function. Are there any examples of what Column expects? Deenar On 2 June 2015 at 21:13, Holden Karau hol...@pigscanfly.ca wrote: So for column you need to pass in a Java function, I have some sample code which does this but it does terrible things to access Spark internals. On Tuesday, June 2, 2015, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Nice to hear from you Holden ! I ended up trying exactly that (Column) - but I may have done it wrong : In [*5*]: g.agg(Column(percentile(value, 0.5))) Py4JError: An error occurred while calling o97.agg. Trace: py4j.Py4JException: Method agg([class java.lang.String, class scala.collection.immutable.Nil$]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) Any idea ? Olivier. Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a écrit : Not super easily, the GroupedData class uses a strToExpr function which has a pretty limited set of functions so we cant pass in the name of an arbitrary hive UDAF (unless I'm missing something). We can instead construct an column with the expression you want and then pass it in to agg() that way (although then you need to call the hive UDAF there). There are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark SQL AggregateExpressions, but they are private. On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Roadmap for Spark with Kafka on Scala 2.11?
But compile scope is supposed to be added to the assembly. https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope On Thu, Jun 4, 2015 at 1:24 PM, algermissen1971 algermissen1...@icloud.com wrote: Hi Iulian, On 26 May 2015, at 13:04, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Tue, May 26, 2015 at 10:09 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I am setting up a project that requires Kafka support and I wonder what the roadmap is for Scala 2.11 Support (including Kafka). Can we expect to see 2.11 support anytime soon? The upcoming 1.4 release (now at RC2) includes support for Kafka and Scala 2.11.6. It'd be great if you could give it a try. You can find the binaries (and staging repository including 2.11 artifacts) here: https://www.mail-archive.com/dev@spark.apache.org/msg09347.html Feedback after a coupl eof days: - I am using 1.4.0-rc4 now without problems - Not used Kafka support yet - I am using this with akka-2.3.11 and akka-http 1.0-RC3 (and sbt-assembly) and this has produced a dependency nightmare. I am even adding guava manually to the assembly because I just could not get sbt-assembly to not complain. I am far from a good understanding of sbt / maven internals, but it seems that the ‘compile’ scope set in the spark POM for a lot of dependencies is somehow not honored and the libs end up causing conflicts in sbt-assembly. (I am writing this to share experience, not to complain. Thanks for the great work!!) onward... Jan iulian Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to share large resources like dictionaries while processing data with Spark ?
You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sqlCtx.load a single big csv file from s3 in parallel
Hi there! I'm trying to read a large .csv file (14GB) into a dataframe from S3 via the spark-csv package. I want to load this data in parallel utilizing all 20 executors that I have, however by default only 3 executors are being used (which downloaded 5gb/5gb/4gb). Here is my script (im using pyspark): lol_file = sqlCtx.load(source=com.databricks.spark.csv, header=false, path=lol_file_path) I have tried add option flags 1) minSplits=120, 2) minPartitions=120 but neither worked. I tried reading the source code but I'm noob at scala and could not figure out how the options are being used :( Thank you for reading and any help is much appreciated! Guang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sqlCtx-load-a-single-big-csv-file-from-s3-in-parallel-tp23163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to share large resources like dictionaries while processing data with Spark ?
Is the dictionary read-only? Did you look at http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables ? -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Thursday, June 04, 2015 4:50 PM To: user@spark.apache.org Subject: How to share large resources like dictionaries while processing data with Spark ? We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TreeReduce Functionality in Spark
For the first round, you will have 16 reducers working since you have 32 partitions. Two of 32 partitions will know which reducer they will go by sharing the same key using reduceByKey. After this step is done, you will have 16 partitions, so the next round will be 8 reducers. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Thu, Jun 4, 2015 at 12:06 PM, Raghav Shankar raghav0110...@gmail.com wrote: Hey DB, Thanks for the reply! I still don't think this answers my question. For example, if I have a top() action being executed and I have 32 workers(32 partitions), and I choose a depth of 4, what does the overlay of intermediate reducers look like? How many reducers are there excluding the master and the worker? How many partitions get sent to each of these intermediate reducers? Does this number vary at each level? Thanks! On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote: By default, the depth of the tree is 2. Each partition will be one node. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com wrote: Hey Reza, Thanks for your response! Your response clarifies some of my initial thoughts. However, what I don't understand is how the depth of the tree is used to identify how many intermediate reducers there will be, and how many partitions are sent to the intermediate reducers. Could you provide some insight into this? Thanks, Raghav On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote: In a regular reduce, all partitions have to send their reduced value to a single machine, and that machine can become a bottleneck. In a treeReduce, the partitions talk to each other in a logarithmic number of rounds. Imagine a binary tree that has all the partitions at its leaves and the root will contain the final reduced value. This way there is no single bottleneck machine. It remains to decide the number of children each node should have and how deep the tree should be, which is some of the logic in the method you pasted. On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com wrote: I am trying to understand what the treeReduce function for an RDD does, and how it is different from the normal reduce function. My current understanding is that treeReduce tries to split up the reduce into multiple steps. We do a partial reduce on different nodes, and then a final reduce is done to get the final result. Is this correct? If so, I guess what I am curious about is, how does spark decide how many nodes will be on each level, and how many partitions will be sent to a given node? The bulk of the implementation is within this function: partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException(empty collection)) The above function is expanded to val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions scale + numPartitions / scale) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) = iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) I am completely lost about what is happening in this function. I would greatly appreciate some sort of explanation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to share large resources like dictionaries while processing data with Spark ?
Hi there, I would recommend checking out https://github.com/spark-jobserver/spark-jobserver which I think gives the functionality you are looking for. I haven't tested it though. BR On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote: You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to share large resources like dictionaries while processing data with Spark ?
Thanks so much, Yiannis, Olivier, Huang! On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I would recommend checking out https://github.com/spark-jobserver/spark-jobserver which I think gives the functionality you are looking for. I haven't tested it though. BR On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote: You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to share large resources like dictionaries while processing data with Spark ?
We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Deduping events using Spark
I think if you create a bidirectional mapping from AnalyticsEvent to another type that would wrap it and use the nonce as its equality, you could then do something like reduceByKey to group by nonce and map back to AnalyticsEvent after. On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote: I'm still a bit new to Spark and am struggilng to figure out the best way to Dedupe my events. I load my Avro files from HDFS and then I want to dedupe events that have the same nonce. For example my code so far: JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) Now I want to get back an RDD of AnalyticsEvents that are unique. So I basically want to do: if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of them. I'm not sure how to do this? If I do reduceByKey it reduces by AnalyticsEvent not by the values inside? Any guidance would be much appreciated how I can walk this list of events and only return a filtered version of unique nocnes. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: inlcudePackage() deprecated?
Yeah - We don't have support for running UDFs on DataFrames yet. There is an open issue to track this https://issues.apache.org/jira/browse/SPARK-6817 Thanks Shivaram On Thu, Jun 4, 2015 at 3:10 AM, Daniel Emaasit daniel.emaa...@gmail.com wrote: Hello Shivaram, Was the includePackage() function deprecated in SparkR 1.4.0? I don't see it in the documentation? If it was, does that mean that we can use R packages on Spark DataFrames the usual way we do for local R dataframes? Daniel -- Daniel Emaasit Ph.D. Research Assistant Transportation Research Center (TRC) University of Nevada, Las Vegas Las Vegas, NV 89154-4015 Cell: 615-649-2489 www.danielemaasit.com http://www.danielemaasit.com/
Re: TreeReduce Functionality in Spark
By default, the depth of the tree is 2. Each partition will be one node. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com wrote: Hey Reza, Thanks for your response! Your response clarifies some of my initial thoughts. However, what I don't understand is how the depth of the tree is used to identify how many intermediate reducers there will be, and how many partitions are sent to the intermediate reducers. Could you provide some insight into this? Thanks, Raghav On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote: In a regular reduce, all partitions have to send their reduced value to a single machine, and that machine can become a bottleneck. In a treeReduce, the partitions talk to each other in a logarithmic number of rounds. Imagine a binary tree that has all the partitions at its leaves and the root will contain the final reduced value. This way there is no single bottleneck machine. It remains to decide the number of children each node should have and how deep the tree should be, which is some of the logic in the method you pasted. On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com wrote: I am trying to understand what the treeReduce function for an RDD does, and how it is different from the normal reduce function. My current understanding is that treeReduce tries to split up the reduce into multiple steps. We do a partial reduce on different nodes, and then a final reduce is done to get the final result. Is this correct? If so, I guess what I am curious about is, how does spark decide how many nodes will be on each level, and how many partitions will be sent to a given node? The bulk of the implementation is within this function: partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException(empty collection)) The above function is expanded to val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions scale + numPartitions / scale) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) = iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) I am completely lost about what is happening in this function. I would greatly appreciate some sort of explanation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Anybody using Spark SQL JDBC server with DSE Cassandra?
Deenar, Thanks for the suggestion. That is one of the ideas that I have, but didn’t get chance to try it out yet. One of the things that could potentially cause problems is that we use wide rows. In addition, the schema is dynamic, with new columns getting added on a regular basis. That is why I am considering DSE, which has integrated Spark SQL Thrift/JDBC server with Cassandra. Mohammed From: Deenar Toraskar [mailto:deenar.toras...@gmail.com] Sent: Thursday, June 4, 2015 7:42 AM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: Anybody using Spark SQL JDBC server with DSE Cassandra? Mohammed Have you tried registering your Cassandra tables in Hive/Spark SQL using the data frames API. These should be then available to query via the Spark SQL/Thrift JDBC Server. Deenar On 1 June 2015 at 19:33, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Nobody using Spark SQL JDBC/Thrift server with DSE Cassandra? Mohammed From: Mohammed Guller [mailto:moham...@glassbeam.commailto:moham...@glassbeam.com] Sent: Friday, May 29, 2015 11:49 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Anybody using Spark SQL JDBC server with DSE Cassandra? Hi – We have successfully integrated Spark SQL with Cassandra. We have a backend that provides a REST API that allows users to execute SQL queries on data in C*. Now we would like to also support JDBC/ODBC connectivity , so that user can use tools like Tableau to query data in C* through the Spark SQL JDBC server. However, I have been unable to find a driver that would allow the Spark SQL Thrift/JDBC server to connect with Cassandra. DataStax provides a closed-source driver that comes only with the DSE version of Cassandra. I would like to find out how many people are using the Spark SQL JDBC server + DSE Cassandra combination. If you do use Spark SQL JDBC server + DSE, I would appreciate if you could share your experience. For example, what kind of issues you have run into? How is the performance? What reporting tools you are using? Thank you. Mohammed
Re: TreeReduce Functionality in Spark
Hey Reza, Thanks for your response! Your response clarifies some of my initial thoughts. However, what I don't understand is how the depth of the tree is used to identify how many intermediate reducers there will be, and how many partitions are sent to the intermediate reducers. Could you provide some insight into this? Thanks, Raghav On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote: In a regular reduce, all partitions have to send their reduced value to a single machine, and that machine can become a bottleneck. In a treeReduce, the partitions talk to each other in a logarithmic number of rounds. Imagine a binary tree that has all the partitions at its leaves and the root will contain the final reduced value. This way there is no single bottleneck machine. It remains to decide the number of children each node should have and how deep the tree should be, which is some of the logic in the method you pasted. On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote: I am trying to understand what the treeReduce function for an RDD does, and how it is different from the normal reduce function. My current understanding is that treeReduce tries to split up the reduce into multiple steps. We do a partial reduce on different nodes, and then a final reduce is done to get the final result. Is this correct? If so, I guess what I am curious about is, how does spark decide how many nodes will be on each level, and how many partitions will be sent to a given node? The bulk of the implementation is within this function: partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException(empty collection)) The above function is expanded to val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions scale + numPartitions / scale) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) = iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) I am completely lost about what is happening in this function. I would greatly appreciate some sort of explanation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org'); For additional commands, e-mail: user-h...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');
Re: Spark Job always cause a node to reboot
vm.swappiness=0? Some vendors recommend this set to 0 (zero), although I've seen this causes even kernel to fail to allocate memory. It may cause node reboot. If that's the case, set vm.swappiness to 5-10 and decrease spark.*.memory. Your spark.driver.memory+ spark.executor.memory + OS + etc amount of memory node has. -- Ruslan Dautkhanov On Thu, Jun 4, 2015 at 8:59 AM, Chao Chen kandy...@gmail.com wrote: Hi all, I am new to spark. I am trying to deploy HDFS (hadoop-2.6.0) and Spark-1.3.1 with four nodes, and each node has 8-cores and 8GB memory. One is configured as headnode running masters, and 3 others are workers But when I try to run the Pagerank from HiBench, it always cause a node to reboot during the middle of the work for all scala, java, and python versions. But works fine with the MapReduce version from the same benchmark. I also tried standalone deployment, got the same issue. My spark-defaults.conf spark.masteryarn-client spark.driver.memory 4g spark.executor.memory 4g spark.rdd.compress false The job submit script is: bin/spark-submit --properties-file HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class org.apache.spark.examples.SparkPageRank --master yarn-client --num-executors 2 --executor-cores 4 --executor-memory 4G --driver-memory 4G HiBench/src/sparkbench/target/sparkbench-4.0-SNAPSHOT-MR2-spark1.3-jar-with-dependencies.jar hdfs://discfarm:9000/HiBench/Pagerank/Input/edges hdfs://discfarm:9000/HiBench/Pagerank/Output 3 What is problem with my configuration ? and How can I find the cause ? any help is welcome ! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding new Spark workers on AWS EC2 - access error
The issue was that SSH key generated on Spark Master was not transferred to this new slave. Spark-ec2 script with `start` command omits this step. The solution is to use `launch` command with `--resume` options. Then the SSH key is transferred to the new slave and everything goes smooth. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adding-new-Spark-workers-on-AWS-EC2-access-error-tp23143p23155.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to run spark streaming application on YARN?
Thanks! It is working fine now with spark-submit. Just out of curiosity, how would you use org.apache.spark.deploy.yarn.Client? Adding that spark_yarn jar to the configuration inside the application? On Thu, Jun 4, 2015 at 6:37 PM, Vova Shelgunov vvs...@gmail.com wrote: You should run it with spark-submit or using org .apache.spark.deploy.yarn.Client. 2015-06-04 20:30 GMT+03:00 Saiph Kappa saiph.ka...@gmail.com: No, I am not. I run it with sbt «sbt run-main Branchmark». I thought it was the same thing since I am passing all the configurations through the application code. Is that the problem? On Thu, Jun 4, 2015 at 6:26 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Saiph, Are you launching using spark-submit? -Sandy On Thu, Jun 4, 2015 at 10:20 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I've been running my spark streaming application in standalone mode without any worries. Now, I've been trying to run it on YARN (hadoop 2.7.0) but I am having some problems. Here are the config parameters of my application: « val sparkConf = new SparkConf() sparkConf.setMaster(yarn-client) sparkConf.set(spark.yarn.am.memory, 2g) sparkConf.set(spark.executor.instances, 2) sparkConf.setAppName(Benchmark) sparkConf.setJars(Array(target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar)) sparkConf.set(spark.executor.memory, 4g) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.executor.extraJavaOptions, -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC + -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ) if (sparkConf.getOption(spark.master) == None) { sparkConf.setMaster(local[*]) } » The jar I'm including there only contains the application classes. Here is the log of the application: http://pastebin.com/7RSktezA Here is the userlog on hadoop/YARN: « Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:596) at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 14 more » I tried to add the spark core jar to ${HADOOP_HOME}/lib but the error persists. Am I doing something wrong? Thanks.
Re: How to run spark streaming application on YARN?
That might work, but there might also be other steps that are required. -Sandy On Thu, Jun 4, 2015 at 11:13 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Thanks! It is working fine now with spark-submit. Just out of curiosity, how would you use org.apache.spark.deploy.yarn.Client? Adding that spark_yarn jar to the configuration inside the application? On Thu, Jun 4, 2015 at 6:37 PM, Vova Shelgunov vvs...@gmail.com wrote: You should run it with spark-submit or using org .apache.spark.deploy.yarn.Client. 2015-06-04 20:30 GMT+03:00 Saiph Kappa saiph.ka...@gmail.com: No, I am not. I run it with sbt «sbt run-main Branchmark». I thought it was the same thing since I am passing all the configurations through the application code. Is that the problem? On Thu, Jun 4, 2015 at 6:26 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Saiph, Are you launching using spark-submit? -Sandy On Thu, Jun 4, 2015 at 10:20 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I've been running my spark streaming application in standalone mode without any worries. Now, I've been trying to run it on YARN (hadoop 2.7.0) but I am having some problems. Here are the config parameters of my application: « val sparkConf = new SparkConf() sparkConf.setMaster(yarn-client) sparkConf.set(spark.yarn.am.memory, 2g) sparkConf.set(spark.executor.instances, 2) sparkConf.setAppName(Benchmark) sparkConf.setJars(Array(target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar)) sparkConf.set(spark.executor.memory, 4g) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.executor.extraJavaOptions, -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC + -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ) if (sparkConf.getOption(spark.master) == None) { sparkConf.setMaster(local[*]) } » The jar I'm including there only contains the application classes. Here is the log of the application: http://pastebin.com/7RSktezA Here is the userlog on hadoop/YARN: « Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:596) at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 14 more » I tried to add the spark core jar to ${HADOOP_HOME}/lib but the error persists. Am I doing something wrong? Thanks.
Re: How to run spark streaming application on YARN?
Additionally, I think this document ( https://spark.apache.org/docs/latest/building-spark.html ) should mention that the protobuf.version might need to be changed to match the one used in the chosen hadoop version. For instance, with hadoop 2.7.0 I had to change protobuf.version to 1.5.0 to be able to run my application. On Thu, Jun 4, 2015 at 7:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote: That might work, but there might also be other steps that are required. -Sandy On Thu, Jun 4, 2015 at 11:13 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Thanks! It is working fine now with spark-submit. Just out of curiosity, how would you use org.apache.spark.deploy.yarn.Client? Adding that spark_yarn jar to the configuration inside the application? On Thu, Jun 4, 2015 at 6:37 PM, Vova Shelgunov vvs...@gmail.com wrote: You should run it with spark-submit or using org .apache.spark.deploy.yarn.Client. 2015-06-04 20:30 GMT+03:00 Saiph Kappa saiph.ka...@gmail.com: No, I am not. I run it with sbt «sbt run-main Branchmark». I thought it was the same thing since I am passing all the configurations through the application code. Is that the problem? On Thu, Jun 4, 2015 at 6:26 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Saiph, Are you launching using spark-submit? -Sandy On Thu, Jun 4, 2015 at 10:20 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I've been running my spark streaming application in standalone mode without any worries. Now, I've been trying to run it on YARN (hadoop 2.7.0) but I am having some problems. Here are the config parameters of my application: « val sparkConf = new SparkConf() sparkConf.setMaster(yarn-client) sparkConf.set(spark.yarn.am.memory, 2g) sparkConf.set(spark.executor.instances, 2) sparkConf.setAppName(Benchmark) sparkConf.setJars(Array(target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar)) sparkConf.set(spark.executor.memory, 4g) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.executor.extraJavaOptions, -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC + -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ) if (sparkConf.getOption(spark.master) == None) { sparkConf.setMaster(local[*]) } » The jar I'm including there only contains the application classes. Here is the log of the application: http://pastebin.com/7RSktezA Here is the userlog on hadoop/YARN: « Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:596) at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 14 more » I tried to add the spark core jar to ${HADOOP_HOME}/lib but the error persists. Am I doing something wrong? Thanks.
Re: Required settings for permanent HDFS Spark on EC2
Hi - I'm having similar problem with switching from ephemeral to persistent HDFS - it always looks for 9000 port regardless of options I set for 9010 persistent HDFS. Have you figured out a solution? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Required-settings-for-permanent-HDFS-Spark-on-EC2-tp22860p23157.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Deduping events using Spark
I'm still a bit new to Spark and am struggilng to figure out the best way to Dedupe my events. I load my Avro files from HDFS and then I want to dedupe events that have the same nonce. For example my code so far: JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) Now I want to get back an RDD of AnalyticsEvents that are unique. So I basically want to do: if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of them. I'm not sure how to do this? If I do reduceByKey it reduces by AnalyticsEvent not by the values inside? Any guidance would be much appreciated how I can walk this list of events and only return a filtered version of unique nocnes. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Standard Scaler taking 1.5hrs
Hi DB, Yes I am running count() operations on the previous steps and it appears that something is slow prior to the scaler. I thought that running take(5) and print the results would execute the command at each step and materialize the RDD, but is that not the case? That’s how I was testing each step. Thanks, Piero From: DB Tsai [mailto:dbt...@dbtsai.com] Sent: Wednesday, June 03, 2015 10:33 PM To: Piero Cinquegrana Cc: user@spark.apache.org Subject: Re: Standard Scaler taking 1.5hrs Can you do count() before fit to force materialize the RDD? I think something before fit is slow. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.commailto:pcinquegr...@marketshare.com wrote: The fit part is very slow, transform not at all. The number of partitions was 210 vs number of executors 80. Spark 1.4 sounds great but as my company is using Qubole we are dependent upon them to upgrade from version 1.3.1. Until that happens, can you think of any other reasons as to why it could be slow. Sparse vectors? Excessive number of columns? Sent from my mobile device. Please excuse any typos. On Jun 3, 2015, at 9:53 PM, DB Tsai dbt...@dbtsai.comjavascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com'); wrote: Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but very small, and transform doesn't do shuffle. I guess you don't have enough partition, so please repartition your input dataset to a number at least larger than the # of executors you have. In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, and in that version, we use quasi newton for optimization, so it will be a way faster than SGD implementation. Also, in that implementation, StandardScaler is not required since in computing the loss function, we implicitly do this for you. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Please try this out and give us feedback. Thanks. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.comjavascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com'); wrote: Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g. scala val parsedData = stack_sorted.mapPartitions( partition = partition.map{row = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at console:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@d2mailto:org.apache.spark.HashPartitioner@d2) scala val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row = row.features)) scala val scaledData = parsedData.mapPartitions(partition = partition.map{row = LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala val numIterations = 100 scala val stepSize = 0.1 scala val miniBatchFraction = 0.1 scala val algorithm = new LinearRegressionWithSGD() scala algorithm.setIntercept(false) scala algorithm.optimizer.setNumIterations(numIterations) scala algorithm.optimizer.setStepSize(stepSize) scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677 x242tel:310.914.5677%20x242 M: 323.377.9197tel:323.377.9197 www.marketshare.comhttp://www.marketsharepartners.com/ twitter.com/marketsharephttp://twitter.com/marketsharep -- - DB Sent from my iPhone
Re: importerror using external library with pyspark
I would try setting PYSPARK_DRIVER_PYTHON environment variable to the location of your python binary, especially if you are using a virtual environment. -Don On Wed, Jun 3, 2015 at 8:24 PM, AlexG swift...@gmail.com wrote: I have libskylark installed on both machines in my two node cluster in the same locations, and checked that the following code, which calls libskylark, works on both nodes with 'pyspark rfmtest.py': import re import numpy import skylark.ml.kernels import random import os from pyspark import SparkContext sc = SparkContext(appName=test) SIGMA = 10 NUM_RF = 500 numfeatures = 100 numpoints = 1000 kernel = skylark.ml.kernels.Gaussian(numfeatures, SIGMA) S = kernel.rft(NUM_RF) rows = sc.parallelize(numpy.random.rand(numpoints, numfeatures).tolist(), 6) sketched_rows = rows.map(lambda row : S / numpy.ndarray(shape=(1,numfeatures), buffer=numpy.array(row)).copy()) os.system(rm -rf spark_out) sketched_rows.saveAsTextFile('spark_out') However, when I try to run the same code on the cluster with 'spark-submit --master spark://master:7077 rfmtest.py', I get an ImportError saying that skylark.sketch does not exist: 15/06/04 01:21:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on master:40244 (size: 67.5 KB, free: 265.3 MB) 15/06/04 01:21:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node001:45690 (size: 67.5 KB, free: 265.3 MB) 15/06/04 01:21:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, master): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/Spark/python/pyspark/worker.py, line 88, in main command = pickleSer._read_with_length(infile) File /opt/Spark/python/pyspark/serializers.py, line 156, in _read_with_length return self.loads(obj) File /opt/Spark/python/pyspark/serializers.py, line 405, in loads return cPickle.loads(obj) ImportError: No module named skylark.sketch at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Any ideas what might be going on? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/importerror-using-external-library-with-pyspark-tp23145.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/ 800-733-2143
Re: Standard Scaler taking 1.5hrs
take(5) will only evaluate enough partitions to provide 5 elements (sometimes a few more but you get the idea), so it won't trigger a full evaluation of all partitions unlike count(). On Thursday, June 4, 2015, Piero Cinquegrana pcinquegr...@marketshare.com wrote: Hi DB, Yes I am running count() operations on the previous steps and it appears that something is slow prior to the scaler. I thought that running take(5) and print the results would execute the command at each step and materialize the RDD, but is that not the case? That’s how I was testing each step. Thanks, Piero *From:* DB Tsai [mailto:dbt...@dbtsai.com javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com');] *Sent:* Wednesday, June 03, 2015 10:33 PM *To:* Piero Cinquegrana *Cc:* user@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); *Subject:* Re: Standard Scaler taking 1.5hrs Can you do count() before fit to force materialize the RDD? I think something before fit is slow. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.com javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com'); wrote: The fit part is very slow, transform not at all. The number of partitions was 210 vs number of executors 80. Spark 1.4 sounds great but as my company is using Qubole we are dependent upon them to upgrade from version 1.3.1. Until that happens, can you think of any other reasons as to why it could be slow. Sparse vectors? Excessive number of columns? Sent from my mobile device. Please excuse any typos. On Jun 3, 2015, at 9:53 PM, DB Tsai dbt...@dbtsai.com wrote: Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but very small, and transform doesn't do shuffle. I guess you don't have enough partition, so please repartition your input dataset to a number at least larger than the # of executors you have. In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, and in that version, we use quasi newton for optimization, so it will be a way faster than SGD implementation. Also, in that implementation, StandardScaler is not required since in computing the loss function, we implicitly do this for you. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Please try this out and give us feedback. Thanks. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.com wrote: Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g. scala val parsedData = stack_sorted.mapPartitions( partition = partition.map{row = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at console:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some( org.apache.spark.HashPartitioner@d2 javascript:_e(%7B%7D,'cvml','org.apache.spark.HashPartitioner@d2');) scala val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row = row.features)) scala val scaledData = parsedData.mapPartitions(partition = partition.map{row = LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala val numIterations = 100 scala val stepSize = 0.1 scala val miniBatchFraction = 0.1 scala val algorithm = new LinearRegressionWithSGD() scala algorithm.setIntercept(false) scala algorithm.optimizer.setNumIterations(numIterations) scala algorithm.optimizer.setStepSize(stepSize) scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677
Re: Scaling spark jobs returning large amount of data
It is possible to start multiple concurrent drivers, Spark dynamically allocates ports per spark application on driver, master, and workers from a port range. When you collect results back to the driver, they do not go through the master. The master is mostly there as a coordinator between the driver and the cluster of worker nodes, but otherwise the workers and driver communicate directly for the underlying workload. A spark application relates to one instance of a SparkContext programmatically or to one call to one of the spark submit scripts. Assuming you don't have dynamic resource allocation setup, each application takes a fixed amount of the cluster resources to run. So as long as you subdivide your cluster resources properly you can run multiple concurrent applications against it. We are doing this in production presently. Alternately, as Igor suggests, you can share a spark application and launch different jobs within it. They will share the resources allocated to the application in this case. An effect of this is you will only have a finite amount of concurrent spark tasks (roughly translates to 1 task can execute 1 partition of a job at a time). If you launch multiple independent jobs within the same application you will likely want to enable fair job scheduling, otherwise stages between independent jobs will run in a FIFO order instead of interleaving execution. Hope this helps, Richard On Thu, Jun 4, 2015 at 11:20 AM, Igor Berman igor.ber...@gmail.com wrote: Hi, as far as I understand you shouldn't send data to driver. Suppose you have file in hdfs/s3 or cassandra partitioning, you should create your job such that every executor/worker of spark will handle part of your input, transform, filter it and at the end write back to cassandra as output(once again every executor/core inside worker will write part of the output, in your case they will write part of report) In general I find that submitting multiple jobs in same spark context(aka driver) is more performant(you don't pay startup-shutdown time), for this some use rest server for submitting jobs to long running spark context(driver) I'm not sure you can run multiple concurrent drivers because of ports On 4 June 2015 at 17:30, Giuseppe Sarno giuseppesa...@fico.com wrote: Hello, I am relatively new to spark and I am currently trying to understand how to scale large numbers of jobs with spark. I understand that spark architecture is split in “Driver”, “Master” and “Workers”. Master has a standby node in case of failure and workers can scale out. All the examples I have seen show Spark been able to distribute the load to the workers and returning small amount of data to the Driver. In my case I would like to explore the scenario where I need to generate a large report on data stored on Cassandra and understand how Spark architecture will handle this case when multiple report jobs will be running in parallel. According to this presentation https://trongkhoanguyenblog.wordpress.com/2015/01/07/understand-the-spark-deployment-modes/ responses from workers go through the Master and finally to the Driver. Does this mean that the Driver and/ or Master is a single point for all the responses coming back from workers ? Is it possible to start multiple concurrent Drivers ? Regards, Giuseppe. Fair Isaac Services Limited (Co. No. 01998476) and Fair Isaac (Adeptra) Limited (Co. No. 03295455) are registered in England and Wales and have a registered office address of Cottons Centre, 5th Floor, Hays Lane, London, SE1 2QP. This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: Problem reading Parquet from 1.2 to 1.3
I talked to Don outside the list and he says that he's seeing this issue with Apache Spark 1.3 too (not just CDH Spark), so it seems like there is a real issue here. On Wed, Jun 3, 2015 at 1:39 PM, Don Drake dondr...@gmail.com wrote: As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that Spark is behaving differently when reading Parquet directories that contain a .metadata directory. It seems that in spark 1.2.x, it would just ignore the .metadata directory, but now that I'm using Spark 1.3, reading these files causes the following exceptions: scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir) SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [116, 34, 10, 125] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [116, 34, 10, 125] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [117, 101, 116, 10] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . at scala.collection.parallel.package$$anon$1.alongWith(package.scala:87) at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86) at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650) at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72) at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
TF-IDF Question
Hi all!, I have a .txt file where each row of it it¹s a collection of terms of a document separated by space. For example: 1 Hola spark² 2 .. I followed this example of spark site https://spark.apache.org/docs/latest/mllib-feature-extraction.html and i get something like this: tfidf.first() org.apache.spark.mllib.linalg.Vector = (1048576,[35587,884670],[3.458767233,3.458767233]) I think this: 1. First parameter ³1048576² i don¹t know what it is but always it´s the same number (maybe the number of terms). 2. Second parameter ³[35587,884670]² i think are the terms of the first line in my .txt file. 3. Third parameter ³[3.458767233,3.458767233]² i think are the tfidf values for my terms. Anyone knows the exact interpretation of this and in the second point if these values are the terms, how can i match this values with the original terms values (³[35587=Hola,884670=spark]²)?. Regards and thanks in advance. Franco Barrientos Data Scientist Málaga #115, Of. 1003, Las Condes. Santiago, Chile. (+562)-29699649 (+569)-76347893 franco.barrien...@exalitica.com mailto:franco.barrien...@exalitica.com www.exalitica.com http://www.exalitica.com/
How to speed up Spark Job?
I have a spark app that reads avro sequence file data and performs join, reduceByKey Results: Command for all runs: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar *--num-executors 9973 --driver-memory 14g --driver-java-options -XX:MaxPermSize=512M --executor-memory 14g --executor-cores 1 --queue xy* --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-29 endDate=2015-04-29 input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=200G I) Input: (View) RDD1: 2 Days = 20 Files = 17,328,796,820 bytes = PARTIAL (Listing) RDD2: 100 Files = 267,748,253,700 bytes = PARTIAL (SPS) RDD3: 63 Files = 142,687,611,360 bytes = FULL Output: hadoop fs -count epdatasets/viewItem 1 101 342246603 epdatasets/viewItem Runtime: 26mins, 36sec II) Input: (View) RDD1: 2 Days = 40 Files = 34,657,593,640 bytes = PARTIAL (Listing) RDD2: 100 Files = 267,748,253,700 bytes = PARTIAL (SPS) RDD3: 63 Files = 142,687,611,360 bytes = FULL Output: hadoop fs -count epdatasets/viewItem 1 101 667790782 epdatasets/viewItem Runtime: 40mins, 49sec I cannot increase memory as 14G is limit. I can increase number of executors and cores. Please suggest how to make this app run faster. -- Deepak
Re: Optimisation advice for Avro-Parquet merge job
Thanks for the confirmation! We're quite new to Spark, so a little reassurance is a good thing to have sometimes :-) The thing that's concerning me at the moment is that my job doesn't seem to run any faster with more compute resources added to the cluster, and this is proving a little tricky to debug. There are a lot of variables, so here's what we've tried already and the apparent impact. If anyone has any further suggestions, we'd love to hear! * Increase the minimum number of output files (targetPartitions above), so that input groups smaller than our minimum chunk size can still be worked on by more than one executor. This does measurably speed things up, but obviously it's a trade-off, as the original goal for this job is to merge our data into fewer, larger files. * Submit many jobs in parallel, by running the above code in a Callable, on an executor pool. This seems to help, to some extent, but I'm not sure what else needs to be configured alongside it -- driver threads, scheduling policy, etc. We set scheduling to FAIR when doing this, as that seemed like the right approach, but we're not 100% confident. It seemed to help quite substantially anyway, so perhaps this just needs further tuning? * Increasing executors, RAM, etc. This doesn't make a difference by itself for this job, so I'm thinking we're already not fully utilising the resources we have in a smaller cluster. Again, any recommendations appreciated. Thanks for the help! James. On 4 June 2015 at 15:00, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com: Hi, We have a load of Avro data coming into our data systems in the form of relatively small files, which we're merging into larger Parquet files with Spark. I've been following the docs and the approach I'm taking seemed fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's not the most optimal approach. I was wondering if anyone on this list might have some advice to make to make this job as efficient as possible. Here's some code: DataFrame dfInput = sqlContext.load(inputPaths.get(0), com.databricks.spark.avro); long totalSize = getDirSize(inputPaths.get(0)); for (int i = 1; i inputs.size(); ++i) { dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), com.databricks.spark.avro)); totalSize += getDirSize(inputPaths.get(i)); } int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); DataFrame outputFrame; // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence // the synchronize block below. Suggestions welcome here too! :-) synchronized (this) { RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false, null); outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); } outputFrame.save(outputPath, parquet, SaveMode.Overwrite); Here are some things bothering me: - Conversion to an RDD and back again so that we can use coalesce() to reduce the number of partitions. This is because we read that repartition() is not as efficient as coalesce(), and local micro benchmarks seemed to somewhat confirm that this was faster. Is this really a good idea though? Should we be doing something else? Repartition uses coalesce but with a forced shuffle step. Its just a shortcut for coalesce(xxx, true) Doing a coalesce sounds correct, I'd do the same :) Note that if you add the shuffle step, then your partitions should be better balanced. - Usage of unionAll() - this is the only way I could find to join the separate data sets into a single data frame to save as Parquet. Is there a better way? When using directly the inputformats you can do this FileInputFormat.addInputPath, it should perform at least as good as union. - Do I need to be using the DataFrame API at all? I'm not querying any data here, so the nice API for SQL-like transformations of the data isn't being used. The DataFrame API just seemed like the path of least resistance for working with Avro and Parquet. Would there be any advantage to using hadoopRDD() with the appropriate Input/Output formats? Using directly the input/outputformats sounds viable. But the snippet you show seems clean enough and I am not sure there would be much value in making something (maybe) slightly faster but harder to understand. Eugen Any advice or tips greatly appreciated! James.
Re: Spark 1.3.1 On Mesos Issues.
So a few updates. When I run local as stated before, it works fine. When I run in Yarn (via Apache Myriad on Mesos) it also runs fine. The only issue is specifically with Mesos. I wonder if there is some sort of class path goodness I need to fix or something along that lines. Any tips would be appreciated. Thanks! John On Mon, Jun 1, 2015 at 6:14 PM, Dean Wampler deanwamp...@gmail.com wrote: It would be nice to see the code for MapR FS Java API, but my google foo failed me (assuming it's open source)... So, shooting in the dark ;) there are a few things I would check, if you haven't already: 1. Could there be 1.2 versions of some Spark jars that get picked up at run time (but apparently not in local mode) on one or more nodes? (Side question: Does your node experiment fail on all nodes?) Put another way, are the classpaths good for all JVM tasks? 2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos? Incidentally, how are you combining Mesos and MapR? Are you running Spark in Mesos, but accessing data in MapR-FS? Perhaps the MapR shim library doesn't support Spark 1.3.1. HTH, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote: All - I am facing and odd issue and I am not really sure where to go for support at this point. I am running MapR which complicates things as it relates to Mesos, however this HAS worked in the past with no issues so I am stumped here. So for starters, here is what I am trying to run. This is a simple show tables using the Hive Context: from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext, Row, HiveContext sparkhc = HiveContext(sc) test = sparkhc.sql(show tables) for r in test.collect(): print r When I run it on 1.3.1 using ./bin/pyspark --master local This works with no issues. When I run it using Mesos with all the settings configured (as they had worked in the past) I get lost tasks and when I zoom in them, the error that is being reported is below. Basically it's a NullPointerException on the com.mapr.fs.ShimLoader. What's weird to me is is I took each instance and compared both together, the class path, everything is exactly the same. Yet running in local mode works, and running in mesos fails. Also of note, when the task is scheduled to run on the same node as when I run locally, that fails too! (Baffling). Ok, for comparison, how I configured Mesos was to download the mapr4 package from spark.apache.org. Using the exact same configuration file (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0. When I run this example with the mapr4 for 1.2.0 there is no issue in Mesos, everything runs as intended. Using the same package for 1.3.1 then it fails. (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as well). So basically When I used 1.2.0 and followed a set of steps, it worked on Mesos and 1.3.1 fails. Since this is a current version of Spark, MapR is supports 1.2.1 only. (Still working on that). I guess I am at a loss right now on why this would be happening, any pointers on where I could look or what I could tweak would be greatly appreciated. Additionally, if there is something I could specifically draw to the attention of MapR on this problem please let me know, I am perplexed on the change from 1.2.0 to 1.3.1. Thank you, John Full Error on 1.3.1 on Mesos: 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB java.lang.NullPointerException at com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141) at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at
Re: Optimisation advice for Avro-Parquet merge job
Hi 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com: Hi, We have a load of Avro data coming into our data systems in the form of relatively small files, which we're merging into larger Parquet files with Spark. I've been following the docs and the approach I'm taking seemed fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's not the most optimal approach. I was wondering if anyone on this list might have some advice to make to make this job as efficient as possible. Here's some code: DataFrame dfInput = sqlContext.load(inputPaths.get(0), com.databricks.spark.avro); long totalSize = getDirSize(inputPaths.get(0)); for (int i = 1; i inputs.size(); ++i) { dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), com.databricks.spark.avro)); totalSize += getDirSize(inputPaths.get(i)); } int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); DataFrame outputFrame; // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence // the synchronize block below. Suggestions welcome here too! :-) synchronized (this) { RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false, null); outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); } outputFrame.save(outputPath, parquet, SaveMode.Overwrite); Here are some things bothering me: - Conversion to an RDD and back again so that we can use coalesce() to reduce the number of partitions. This is because we read that repartition() is not as efficient as coalesce(), and local micro benchmarks seemed to somewhat confirm that this was faster. Is this really a good idea though? Should we be doing something else? Repartition uses coalesce but with a forced shuffle step. Its just a shortcut for coalesce(xxx, true) Doing a coalesce sounds correct, I'd do the same :) Note that if you add the shuffle step, then your partitions should be better balanced. - Usage of unionAll() - this is the only way I could find to join the separate data sets into a single data frame to save as Parquet. Is there a better way? When using directly the inputformats you can do this FileInputFormat.addInputPath, it should perform at least as good as union. - Do I need to be using the DataFrame API at all? I'm not querying any data here, so the nice API for SQL-like transformations of the data isn't being used. The DataFrame API just seemed like the path of least resistance for working with Avro and Parquet. Would there be any advantage to using hadoopRDD() with the appropriate Input/Output formats? Using directly the input/outputformats sounds viable. But the snippet you show seems clean enough and I am not sure there would be much value in making something (maybe) slightly faster but harder to understand. Eugen Any advice or tips greatly appreciated! James.
Big performance difference when joining 3 tables in different order
Hi, I encountered a performance issue when join 3 tables in sparkSQL. Here is the query: SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt FROM t_category c, t_zipcode z, click_meter_site_grouped g WHERE c.refCategoryID = g.category AND z.regionCode = g.region I need to pay a lot of attention to the table order in FROM clause, if not, some order makes the driver broken, some order makes the job extremely slow, only one order makes the job finished quickly. For the slow one, I noticed a table is loaded 56 times !!! from its CSV file. I would like to know more about join implement in SparkSQL the understand the issue (auto broadcast, etc). For ones want to know more about the details, here is the jira: https://issues.apache.org/jira/browse/SPARK-8102 Any help is welcome. =) Thx Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Big-performance-difference-when-joining-3-tables-in-different-order-tp23150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scaling spark jobs returning large amount of data
Hello, I am relatively new to spark and I am currently trying to understand how to scale large numbers of jobs with spark. I understand that spark architecture is split in Driver, Master and Workers. Master has a standby node in case of failure and workers can scale out. All the examples I have seen show Spark been able to distribute the load to the workers and returning small amount of data to the Driver. In my case I would like to explore the scenario where I need to generate a large report on data stored on Cassandra and understand how Spark architecture will handle this case when multiple report jobs will be running in parallel. According to this presentation https://trongkhoanguyenblog.wordpress.com/2015/01/07/understand-the-spark-deployment-modes/ responses from workers go through the Master and finally to the Driver. Does this mean that the Driver and/ or Master is a single point for all the responses coming back from workers ? Is it possible to start multiple concurrent Drivers ? Regards, Giuseppe. Fair Isaac Services Limited (Co. No. 01998476) and Fair Isaac (Adeptra) Limited (Co. No. 03295455) are registered in England and Wales and have a registered office address of Cottons Centre, 5th Floor, Hays Lane, London, SE1 2QP. This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: Anybody using Spark SQL JDBC server with DSE Cassandra?
Mohammed Have you tried registering your Cassandra tables in Hive/Spark SQL using the data frames API. These should be then available to query via the Spark SQL/Thrift JDBC Server. Deenar On 1 June 2015 at 19:33, Mohammed Guller moham...@glassbeam.com wrote: Nobody using Spark SQL JDBC/Thrift server with DSE Cassandra? Mohammed *From:* Mohammed Guller [mailto:moham...@glassbeam.com] *Sent:* Friday, May 29, 2015 11:49 AM *To:* user@spark.apache.org *Subject:* Anybody using Spark SQL JDBC server with DSE Cassandra? Hi – We have successfully integrated Spark SQL with Cassandra. We have a backend that provides a REST API that allows users to execute SQL queries on data in C*. Now we would like to also support JDBC/ODBC connectivity , so that user can use tools like Tableau to query data in C* through the Spark SQL JDBC server. However, I have been unable to find a driver that would allow the Spark SQL Thrift/JDBC server to connect with Cassandra. DataStax provides a closed-source driver that comes only with the DSE version of Cassandra. I would like to find out how many people are using the Spark SQL JDBC server + DSE Cassandra combination. If you do use Spark SQL JDBC server + DSE, I would appreciate if you could share your experience. For example, what kind of issues you have run into? How is the performance? What reporting tools you are using? Thank you. Mohammed
Re: Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException
Hi Yin, I’m very surprised to hear that its not supported in 1.3 because I’ve been using it since 1.3.0. It worked great up until SPARK-6908 was merged into master. What is the supported way to get DF for a table that is not in the default database ? IMHO, If you are not going to support “databaseName.tableName”, sqlContext.table() should have a version that takes a database and a table, ie def table(databaseName: String, tableName: String): DataFrame = DataFrame(this, catalog.lookupRelation(Seq(databaseName,tableName))) The handling of databases in Spark(sqlContext, hiveContext, Catalog) could be better. Thanks, Doug On Jun 3, 2015, at 8:21 PM, Yin Huai yh...@databricks.com wrote: Hi Doug, Actually, sqlContext.table does not support database name in both Spark 1.3 and Spark 1.4. We will support it in future version. Thanks, Yin On Wed, Jun 3, 2015 at 10:45 AM, Doug Balog doug.sparku...@dugos.com wrote: Hi, sqlContext.table(“db.tbl”) isn’t working for me, I get a NoSuchTableException. But I can access the table via sqlContext.sql(“select * from db.tbl”) So I know it has the table info from the metastore. Anyone else see this ? I’ll keep digging. I compiled via make-distribution -Pyarn -phadoop-2.4 -Phive -Phive-thriftserver It worked for me in 1.3.1 Cheers, Doug - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Setting S3 output file grantees for spark output files
Hi all, I'm running Spark on AWS EMR and I'm having some issues getting the correct permissions on the output files using rdd.saveAsTextFile('file_dir_name'). In hive, I would add a line in the beginning of the script with set fs.s3.canned.acl=BucketOwnerFullControl and that would set the correct grantees for the files. For Spark, I tried adding the permissions as a --conf option: hadoop jar /mnt/var/lib/hadoop/steps/s-3HIRLHJJXV3SJ/script-runner.jar \ /home/hadoop/spark/bin/spark-submit --deploy-mode cluster --master yarn-cluster \ --conf spark.driver.extraJavaOptions -Dfs.s3.canned.acl=BucketOwnerFullControl \ hdfs:///user/hadoop/spark.py But the permissions do not get set properly on the output files. What is the proper way to pass in the 'fs.s3.canned.acl=BucketOwnerFullControl' or any of the S3 canned permissions to the spark job? Thanks in advance
Re: Spark 1.4 HiveContext fails to initialise with native libs
Are you using RC4? On Wed, Jun 3, 2015 at 10:58 PM, Night Wolf nightwolf...@gmail.com wrote: Thanks Yin, that seems to work with the Shell. But on a compiled application with Spark-submit it still fails with the same exception. On Thu, Jun 4, 2015 at 2:46 PM, Yin Huai yh...@databricks.com wrote: Can you put the following setting in spark-defaults.conf and try again? spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni https://issues.apache.org/jira/browse/SPARK-7819 has more context about it. On Wed, Jun 3, 2015 at 9:38 PM, Night Wolf nightwolf...@gmail.com wrote: Hi all, Trying out Spark 1.4 RC4 on MapR4/Hadoop 2.5.1 running in yarn-client mode with Hive support. *Build command;* ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests -e -X When trying to run a hive query in the spark shell *sqlContext.sql(show tables)* I get the following exception; scala sqlContext.sql(show tables) 15/06/04 04:33:16 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 0.13.1 using Spark classes. java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:323) at com.mapr.fs.ShimLoader.load(ShimLoader.java:198) at org.apache.hadoop.conf.CoreDefaultProperties.clinit(CoreDefaultProperties.java:59) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1857) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2072) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2282) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2234) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2151) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1002) at org.apache.hadoop.conf.Configuration.set(Configuration.java:974) at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:518) at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:536) at org.apache.hadoop.mapred.JobConf.init(JobConf.java:430) at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:1366) at org.apache.hadoop.hive.conf.HiveConf.init(HiveConf.java:1332) at org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:99) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170) at org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175) at org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:370) at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:370) at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:369) at org.apache.spark.sql.hive.HiveContext$$anon$1.init(HiveContext.scala:382) at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:382) at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:381) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:901) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $line37.$read$$iwC$$iwC$$iwC$$iwC.init(console:32) at $line37.$read$$iwC$$iwC$$iwC.init(console:34) at $line37.$read$$iwC$$iwC.init(console:36) at $line37.$read$$iwC.init(console:38) at $line37.$read.init(console:40) at $line37.$read$.init(console:44) at $line37.$read$.clinit(console) at $line37.$eval$.init(console:7) at
Spark Job always cause a node to reboot
Hi all, I am new to spark. I am trying to deploy HDFS (hadoop-2.6.0) and Spark-1.3.1 with four nodes, and each node has 8-cores and 8GB memory. One is configured as headnode running masters, and 3 others are workers But when I try to run the Pagerank from HiBench, it always cause a node to reboot during the middle of the work for all scala, java, and python versions. But works fine with the MapReduce version from the same benchmark. I also tried standalone deployment, got the same issue. My spark-defaults.conf spark.masteryarn-client spark.driver.memory 4g spark.executor.memory 4g spark.rdd.compress false The job submit script is: bin/spark-submit --properties-file HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class org.apache.spark.examples.SparkPageRank --master yarn-client --num-executors 2 --executor-cores 4 --executor-memory 4G --driver-memory 4G HiBench/src/sparkbench/target/sparkbench-4.0-SNAPSHOT-MR2-spark1.3-jar-with-dependencies.jar hdfs://discfarm:9000/HiBench/Pagerank/Input/edges hdfs://discfarm:9000/HiBench/Pagerank/Output 3 What is problem with my configuration ? and How can I find the cause ? any help is welcome ! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StreamingListener, anyone?
Hi Here's a working example: https://gist.github.com/akhld/b10dc491aad1a2007183 [image: Inline image 1] Thanks Best Regards On Wed, Jun 3, 2015 at 10:09 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I've got a Spark Streaming driver job implemented and in it, I register a streaming listener, like so: JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); jssc.addStreamingListener(new JobListener(jssc)); where JobListener is defined like so private static class JobListener implements StreamingListener { private JavaStreamingContext jssc; JobListener(JavaStreamingContext jssc) { this.jssc = jssc; } @Override public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { System.out.println( Batch completed.); jssc.stop(true); System.out.println( The job has been stopped.); } I do not seem to be seeing onBatchCompleted being triggered. Am I doing something wrong? In this particular case, I was trying to implement a bulk ingest type of logic where the first batch is all we're interested in (reading out of a Kafka topic with offset reset set to smallest). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this setting can be seen in web ui's environment tab. But, it still eats memory, i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: Could you set spark.shuffle.io.preferDirectBufs to false to turn off the off-heap allocation of netty? Best Regards, Shixiong Zhu 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com: Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example:
Re: StreamingListener, anyone?
You should not call `jssc.stop(true);` in a StreamingListener. It will cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit. Best Regards, Shixiong Zhu 2015-06-04 0:39 GMT+08:00 dgoldenberg dgoldenberg...@gmail.com: Hi, I've got a Spark Streaming driver job implemented and in it, I register a streaming listener, like so: JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); jssc.addStreamingListener(new JobListener(jssc)); where JobListener is defined like so private static class JobListener implements StreamingListener { private JavaStreamingContext jssc; JobListener(JavaStreamingContext jssc) { this.jssc = jssc; } @Override public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { System.out.println( Batch completed.); jssc.stop(true); System.out.println( The job has been stopped.); } I do not seem to be seeing onBatchCompleted being triggered. Am I doing something wrong? In this particular case, I was trying to implement a bulk ingest type of logic where the first batch is all we're interested in (reading out of a Kafka topic with offset reset set to smallest). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NullPointerException SQLConf.setConf
Hi, I am using Hive 0.14 and spark 0.13. I got java.lang.NullPointerException when inserted into hive. Any suggestions please. hiveContext.sql(INSERT OVERWRITE table 4dim partition (zone= + ZONE + ,z= + zz + ,year= + YEAR + ,month= + MONTH + ) + select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim where z= + zz); java.lang.NullPointerException at org.apache.spark.sql.SQLConf.setConf(SQLConf.scala:196) at org.apache.spark.sql.SQLContext.setConf(SQLContext.scala:74) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3$$anonfun$apply$1.apply(LoadWrfIntoHiveOptReduce1.scala:110) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3$$anonfun$apply$1.apply(LoadWrfIntoHiveOptReduce1.scala:107) at scala.collection.immutable.Range.foreach(Range.scala:141) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3.apply(LoadWrfIntoHiveOptReduce1.scala:107) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3.apply(LoadWrfIntoHiveOptReduce1.scala:107) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4 HiveContext fails to initialise with native libs
Thanks Yin, that seems to work with the Shell. But on a compiled application with Spark-submit it still fails with the same exception. On Thu, Jun 4, 2015 at 2:46 PM, Yin Huai yh...@databricks.com wrote: Can you put the following setting in spark-defaults.conf and try again? spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni https://issues.apache.org/jira/browse/SPARK-7819 has more context about it. On Wed, Jun 3, 2015 at 9:38 PM, Night Wolf nightwolf...@gmail.com wrote: Hi all, Trying out Spark 1.4 RC4 on MapR4/Hadoop 2.5.1 running in yarn-client mode with Hive support. *Build command;* ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests -e -X When trying to run a hive query in the spark shell *sqlContext.sql(show tables)* I get the following exception; scala sqlContext.sql(show tables) 15/06/04 04:33:16 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 0.13.1 using Spark classes. java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:323) at com.mapr.fs.ShimLoader.load(ShimLoader.java:198) at org.apache.hadoop.conf.CoreDefaultProperties.clinit(CoreDefaultProperties.java:59) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1857) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2072) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2282) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2234) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2151) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1002) at org.apache.hadoop.conf.Configuration.set(Configuration.java:974) at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:518) at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:536) at org.apache.hadoop.mapred.JobConf.init(JobConf.java:430) at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:1366) at org.apache.hadoop.hive.conf.HiveConf.init(HiveConf.java:1332) at org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:99) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170) at org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175) at org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:370) at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:370) at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:369) at org.apache.spark.sql.hive.HiveContext$$anon$1.init(HiveContext.scala:382) at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:382) at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:381) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:901) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $line37.$read$$iwC$$iwC$$iwC$$iwC.init(console:32) at $line37.$read$$iwC$$iwC$$iwC.init(console:34) at $line37.$read$$iwC$$iwC.init(console:36) at $line37.$read$$iwC.init(console:38) at $line37.$read.init(console:40) at $line37.$read$.init(console:44) at $line37.$read$.clinit(console) at $line37.$eval$.init(console:7) at $line37.$eval$.clinit(console) at $line37.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
SparkSQL DF.explode with Nulls
Hi, I've worked out how to use explode on my input avro dataset with the following structure root |-- pageViewId: string (nullable = false) |-- components: array (nullable = true) ||-- element: struct (containsNull = false) |||-- name: string (nullable = false) |||-- loadTimeMs: long (nullable = true) I'm trying to turn this into this layout with repeated pageViewIds for each row of my components: root |-- pageViewId: string (nullable = false) |-- name: string (nullable = false) |-- loadTimeMs: long (nullable = true) Explode words fine for the first 10 records using this bit of code, but my big problem is that loadTimeMs has nulls in it, which I think is causing the error. Any ideas how I can trap those nulls? Perhaps by converting to zeros and then I can deal with them later? I tried writing a udf which just takes the loadTimeMs column and swaps nulls for zeros, but this separates the struct and then I don't know how to use explode. avroFile.filter($lazyComponents.components.isNotNull) .explode($lazyComponents.components) { case Row(lazyComponents: Seq[Row]) = lazyComponents .map(x = x.getString(0) - x.getLong(1))} .select('pageViewId, '_1, '_2) .take(10).foreach(println) 15/06/04 12:01:21 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID 65) java.lang.RuntimeException: Failed to check null bit for primitive long value. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:87) at $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33) at $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33) at $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33) at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55) at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:89) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:71) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Re: Embedding your own transformer in Spark.ml Pipleline
Hi Brandon, they are available, but private to ml package. They are now public in 1.4. For 1.3.1 you can define your transformer in org.apache.spark.ml package - then you could use these traits. Thanks, Peter Rudenko On 2015-06-04 20:28, Brandon Plaster wrote: Is HasInputCol and HasOutputCol available in 1.3.1? I'm getting the following message when I'm trying to implement a Transformer and importing org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}: error: object shared is not a member of package org.apache.spark.ml.param and error: trait HasInputCol in package param cannot be accessed in package org.apache.spark.ml.param On Tue, Jun 2, 2015 at 1:51 PM, Peter Rudenko petro.rude...@gmail.com mailto:petro.rude...@gmail.com wrote: Hi Dimple, take a look to existing transformers: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala (*it's for spark-1.4) The idea is just to implement class that extends Transformer withHasInputColwithHasOutputCol (if your transformer 1:1 column transformer) and has deftransform(dataset: DataFrame):DataFrame method. Thanks, Peter On 2015-06-02 20:19, dimple wrote: Hi, I would like to embed my own transformer in the Spark.ml Pipleline but do not see an example of it. Can someone share an example of which classes/interfaces I need to extend/implement in order to do so. Thanks. Dimple -- View this message in context:http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail:user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException
Hi Doug, sqlContext.table does not officially support database name. It only supports table name as the parameter. We will add a method to support database name in future. Thanks, Yin On Thu, Jun 4, 2015 at 8:10 AM, Doug Balog doug.sparku...@dugos.com wrote: Hi Yin, I’m very surprised to hear that its not supported in 1.3 because I’ve been using it since 1.3.0. It worked great up until SPARK-6908 was merged into master. What is the supported way to get DF for a table that is not in the default database ? IMHO, If you are not going to support “databaseName.tableName”, sqlContext.table() should have a version that takes a database and a table, ie def table(databaseName: String, tableName: String): DataFrame = DataFrame(this, catalog.lookupRelation(Seq(databaseName,tableName))) The handling of databases in Spark(sqlContext, hiveContext, Catalog) could be better. Thanks, Doug On Jun 3, 2015, at 8:21 PM, Yin Huai yh...@databricks.com wrote: Hi Doug, Actually, sqlContext.table does not support database name in both Spark 1.3 and Spark 1.4. We will support it in future version. Thanks, Yin On Wed, Jun 3, 2015 at 10:45 AM, Doug Balog doug.sparku...@dugos.com wrote: Hi, sqlContext.table(“db.tbl”) isn’t working for me, I get a NoSuchTableException. But I can access the table via sqlContext.sql(“select * from db.tbl”) So I know it has the table info from the metastore. Anyone else see this ? I’ll keep digging. I compiled via make-distribution -Pyarn -phadoop-2.4 -Phive -Phive-thriftserver It worked for me in 1.3.1 Cheers, Doug - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: inlcudePackage() deprecated?
Got it. Ignore my similar question on Github comments. On Thu, Jun 4, 2015 at 11:48 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Yeah - We don't have support for running UDFs on DataFrames yet. There is an open issue to track this https://issues.apache.org/jira/browse/SPARK-6817 Thanks Shivaram On Thu, Jun 4, 2015 at 3:10 AM, Daniel Emaasit daniel.emaa...@gmail.com wrote: Hello Shivaram, Was the includePackage() function deprecated in SparkR 1.4.0? I don't see it in the documentation? If it was, does that mean that we can use R packages on Spark DataFrames the usual way we do for local R dataframes? Daniel -- Daniel Emaasit Ph.D. Research Assistant Transportation Research Center (TRC) University of Nevada, Las Vegas Las Vegas, NV 89154-4015 Cell: 615-649-2489 www.danielemaasit.com http://www.danielemaasit.com/ -- Daniel Emaasit Ph.D. Research Assistant Transportation Research Center (TRC) University of Nevada, Las Vegas Las Vegas, NV 89154-4015 Cell: 615-649-2489 www.danielemaasit.com http://www.danielemaasit.com/
Re: TreeReduce Functionality in Spark
Hey DB, Thanks for the reply! I still don't think this answers my question. For example, if I have a top() action being executed and I have 32 workers(32 partitions), and I choose a depth of 4, what does the overlay of intermediate reducers look like? How many reducers are there excluding the master and the worker? How many partitions get sent to each of these intermediate reducers? Does this number vary at each level? Thanks! On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote: By default, the depth of the tree is 2. Each partition will be one node. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com javascript:; wrote: Hey Reza, Thanks for your response! Your response clarifies some of my initial thoughts. However, what I don't understand is how the depth of the tree is used to identify how many intermediate reducers there will be, and how many partitions are sent to the intermediate reducers. Could you provide some insight into this? Thanks, Raghav On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com javascript:; wrote: In a regular reduce, all partitions have to send their reduced value to a single machine, and that machine can become a bottleneck. In a treeReduce, the partitions talk to each other in a logarithmic number of rounds. Imagine a binary tree that has all the partitions at its leaves and the root will contain the final reduced value. This way there is no single bottleneck machine. It remains to decide the number of children each node should have and how deep the tree should be, which is some of the logic in the method you pasted. On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com javascript:; wrote: I am trying to understand what the treeReduce function for an RDD does, and how it is different from the normal reduce function. My current understanding is that treeReduce tries to split up the reduce into multiple steps. We do a partial reduce on different nodes, and then a final reduce is done to get the final result. Is this correct? If so, I guess what I am curious about is, how does spark decide how many nodes will be on each level, and how many partitions will be sent to a given node? The bulk of the implementation is within this function: partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException(empty collection)) The above function is expanded to val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions scale + numPartitions / scale) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) = iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) I am completely lost about what is happening in this function. I would greatly appreciate some sort of explanation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
Re: Python Image Library and Spark
Replace this line: img_data = sc.parallelize( list(im.getdata()) ) With: img_data = sc.parallelize( list(im.getdata()), 3 * No cores you have ) Thanks Best Regards On Thu, Jun 4, 2015 at 1:57 AM, Justin Spargur jmspar...@gmail.com wrote: Hi all, I'm playing around with manipulating images via Python and want to utilize Spark for scalability. That said, I'm just learing Spark and my Python is a bit rusty (been doing PHP coding for the last few years). I think I have most of the process figured out. However, the script fails on larger images and Spark is sending out the following warning for smaller images: Stage 0 contains a task of very large size (1151 KB). The maximum recommended task size is 100 KB. My code is as follows: import Image from pyspark import SparkContext if __name__ == __main__: imageFile = sample.jpg outFile = sample.gray.jpg sc = SparkContext(appName=Grayscale) im = Image.open(imageFile) # Create an RDD for the data from the image file img_data = sc.parallelize( list(im.getdata()) ) # Create an RDD for the grayscale value gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 + x[2]*0.07) ) # Put our grayscale value into the RGR channels grayscale = gValue.map( lambda x: (x,x,x) ) # Save the output in a new image. im.putdata( grayscale.collect() ) im.save(outFile) Obviously, something is amiss. However, I can't figure out where I'm off track with this. Any help is appreciated! Thanks in advance!!!
Spark ML decision list
Hi, I have used weka machine learning library for generating a model for my training set. I have used the PART algorithm (decision lists) from weka. Now, I would like to use spark ML for the PART algo for my training set and could not seem to find a parallel. Could anyone point out the corresponding algorithm or even if its available in Spark ML? Thanks, Sateesh
Re: Adding new Spark workers on AWS EC2 - access error
That's because you need to add the master's public key (~/.ssh/id_rsa.pub) to the newly added slaves ~/.ssh/authorized_keys. I add slaves this way: - Launch a new instance by clicking on the slave instance and choose *launch more like this * *- *Once its launched, ssh into it and add the master public key to .ssh/authorized_keys - Add the slaves internal IP to the master's conf/slaves file - Rsync spark directory to the slave machine (*rsync -za ~/spark SLAVES-IP:* ) - do sbin/start-all.sh and it will show up along with other slaves. Thanks Best Regards On Thu, Jun 4, 2015 at 6:45 AM, barmaley o...@solver.com wrote: I have the existing operating Spark cluster that was launched with spark-ec2 script. I'm trying to add new slave by following the instructions: Stop the cluster On AWS console launch more like this on one of the slaves Start the cluster Although the new instance is added to the same security group and I can successfully SSH to it with the same private key, spark-ec2 ... start call can't access this machine for some reason: Running setup-slave on all cluster nodes to mount filesystems, etc... [1] 00:59:59 [FAILURE] ec2-52-25-53-64.us-west-2.compute.amazonaws.com Exited with error code 255 Stderr: Permission denied (publickey). , obviously, followed by tons of other errors while trying to deploy Spark stuff on this instance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adding-new-Spark-workers-on-AWS-EC2-access-error-tp23143.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Equivalent to Storm's 'field grouping' in Spark.
Hi Matei, thank you for answering. Accordingly to what you said, am I mistaken when I say that tuples with the same key might eventually be spread across more than one node in case an overloaded worker can no longer accept tuples? In other words, suppose a worker (processing key K) cannot accept more tuples: how does Spark Streaming handle the other K-keyed tuples? Systems like Storm do not provide any mechanism to handle such a situation. I am pretty new to Spark, and I apologize if the question sounds too naive, but I am experiencing some troubles in understanding Spark Internals! Thank you, again! 2015-06-03 19:34 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: This happens automatically when you use the byKey operations, e.g. reduceByKey, updateStateByKey, etc. Spark Streaming keeps the state for a given set of keys on a specific node and sends new tuples with that key to that. Matei On Jun 3, 2015, at 6:31 AM, allonsy luke1...@gmail.com wrote: Hi everybody, is there in Spark anything sharing the philosophy of Storm's field grouping? I'd like to manage data partitioning across the workers by sending tuples sharing the same key to the very same worker in the cluster, but I did not find any method to do that. Suggestions? :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Equivalent-to-Storm-s-field-grouping-in-Spark-tp23135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Difference bewteen library dependencies version
Hello, *(Before everything : I use IntellijIdea 14.0.1, SBT and Scala 2.11.6)* This morning, I was looking to resolve the Failed to locate the winutils binary in the hadoop binary path error. I noticed that I can solve it configuring my build.sbt to ... libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 excludeAll( ExclusionRule(organization = org.apache.hadoop) ) libraryDependencies += org.apache.spark %% spark-mllib % 1.3.1 excludeAll( ExclusionRule(organization = org.apache.hadoop) ) but if i change the line libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4 to libraryDependencies += org.apache.hadoop % hadoop-client % 2.7.0 the error is back. What does it mean? Spark is build for an old version of hadoop? I really want to understand. *Also, a bonus question : * As you can see I am using spark 1.3.1 and spark-mllib APIs. I am using the last version, but my APIs are not corresponding to the latest official APIs (https://spark.apache.org/docs/*latest*/api/scala/#package) For example, to run a KMeans algo, I have to use KMeans.train() whereas it does not exist in the latest API. First time, I ask something in the mailing list, I hope I use it well. Sorry for my bad english. Thank you and have a good day, JC ᐧ
large shuffling = executor lost?
Hi, I am running my graphx application with Spark 1.3.1 on a small cluster. Then it failed on this exception: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 But actually I found it is caused by “ExecutorLostFailure” indeed, and someone told it might because there was a large shuffling… Is there anyone has idea to fix it? Thanks in advance! Best, Yifan LI
Re: How to create fewer output files for Spark job ?
It worked. On Thu, Jun 4, 2015 at 5:14 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Try using coalesce Thanks Regards, Meethu M On Wednesday, 3 June 2015 11:26 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am running a series of spark functions with 9000 executors and its resulting in 9000+ files that is execeeding the namespace file count qutota. How can Spark be configured to use CombinedOutputFormat. {code} protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) detailRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) } {code} -- Deepak -- Deepak
Re: How to create fewer output files for Spark job ?
Try using coalesce Thanks Regards, Meethu M On Wednesday, 3 June 2015 11:26 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am running a series of spark functions with 9000 executors and its resulting in 9000+ files that is execeeding the namespace file count qutota. How can Spark be configured to use CombinedOutputFormat. {code}protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) detailRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) }{code} -- Deepak
Re: StreamingListener, anyone?
Shixiong, Thanks, interesting point. So if we want to only process one batch then terminate the consumer, what's the best way to achieve that? Presumably the listener could set a flag on the driver notifying it that it can terminate. But the driver is not in a loop, it's basically blocked in awaitTermination. So what would be a way to trigger the termination in the driver? context.awaitTermination() allows the current thread to wait for the termination of a context by stop() or by an exception - presumably, we need to call stop() somewhere or perhaps throw. Cheers, - Dmitry On Thu, Jun 4, 2015 at 3:55 AM, Shixiong Zhu zsxw...@gmail.com wrote: You should not call `jssc.stop(true);` in a StreamingListener. It will cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit. Best Regards, Shixiong Zhu 2015-06-04 0:39 GMT+08:00 dgoldenberg dgoldenberg...@gmail.com: Hi, I've got a Spark Streaming driver job implemented and in it, I register a streaming listener, like so: JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); jssc.addStreamingListener(new JobListener(jssc)); where JobListener is defined like so private static class JobListener implements StreamingListener { private JavaStreamingContext jssc; JobListener(JavaStreamingContext jssc) { this.jssc = jssc; } @Override public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { System.out.println( Batch completed.); jssc.stop(true); System.out.println( The job has been stopped.); } I do not seem to be seeing onBatchCompleted being triggered. Am I doing something wrong? In this particular case, I was trying to implement a bulk ingest type of logic where the first batch is all we're interested in (reading out of a Kafka topic with offset reset set to smallest). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Roadmap for Spark with Kafka on Scala 2.11?
Hi Iulian, On 26 May 2015, at 13:04, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Tue, May 26, 2015 at 10:09 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I am setting up a project that requires Kafka support and I wonder what the roadmap is for Scala 2.11 Support (including Kafka). Can we expect to see 2.11 support anytime soon? The upcoming 1.4 release (now at RC2) includes support for Kafka and Scala 2.11.6. It'd be great if you could give it a try. You can find the binaries (and staging repository including 2.11 artifacts) here: https://www.mail-archive.com/dev@spark.apache.org/msg09347.html Feedback after a coupl eof days: - I am using 1.4.0-rc4 now without problems - Not used Kafka support yet - I am using this with akka-2.3.11 and akka-http 1.0-RC3 (and sbt-assembly) and this has produced a dependency nightmare. I am even adding guava manually to the assembly because I just could not get sbt-assembly to not complain. I am far from a good understanding of sbt / maven internals, but it seems that the ‘compile’ scope set in the spark POM for a lot of dependencies is somehow not honored and the libs end up causing conflicts in sbt-assembly. (I am writing this to share experience, not to complain. Thanks for the great work!!) onward... Jan iulian Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transactional guarantee while saving DataFrame into a DB
Hi Tariq You need to handle the transaction semantics yourself. You could for example save from the dataframe to a staging table and then write to the final table using a single atomic INSERT INTO finalTable from stagingTable call. Remember to clear the staging table first to recover from previous failures if any. Deenar On 2 June 2015 at 16:01, Mohammad Tariq donta...@gmail.com wrote: Hi list, With the help of Spark DataFrame API we can save a DataFrame into a database table through insertIntoJDBC() call. However, I could not find any info about how it handles the transactional guarantee. What if my program gets killed during the processing? Would it end up in partial load? Is it somehow possible to handle these kind of scenarios? Rollback or something of that sort? Many thanks. P.S : I am using spark-1.3.1-bin-hadoop2.4 with java 1.7 [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
set the storage policy for the DStream RDDs to MEMORY AND DISK - it appears the storage level can be specified in the createStream methods but not createDirectStream... On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not about scaling your system/solution In terms of how to scale the Spark Framework Dynamically – even though this is not supported at the moment out of the box I guess you can have a sys management framework spin dynamically a few more boxes (spark worker nodes), stop dynamically your currently running Spark Streaming Job, relaunch it with new params e.g. more Receivers, larger number of Partitions (hence tasks), more RAM per executor
Re: Adding an indexed column
or you could 1) convert dataframe to RDD 2) use mapPartitions and zipWithIndex within each partition 3) convert RDD back to dataframe you will need to make sure you preserve partitioning Deenar On 1 June 2015 at 02:23, ayan guha guha.a...@gmail.com wrote: If you are on spark 1.3, use repartitionandSort followed by mappartition. In 1.4, window functions will be supported, it seems On 1 Jun 2015 04:10, Ricardo Almeida ricardo.alme...@actnowib.com wrote: That's great and how would you create an ordered index by partition (by product in this example)? Assuming now a dataframe like: flag | product | price -- 1| a |47.808764653746 1| b |47.808764653746 1| a |31.9869279512204 1| b |47.7907893713564 1| a |16.7599200038239 1| b |16.7599200038239 1| b |20.3916014172137 get a new dataframe such as: flag | product | price | index -- 1| a |47.808764653746 | 0 1| a |31.9869279512204 | 1 1| a |16.7599200038239 | 2 1| b |47.808764653746 | 0 1| b |47.7907893713564 | 1 1| b |20.3916014172137 | 2 1| b |16.7599200038239 | 3 On 29 May 2015 at 12:25, Wesley Miao wesley.mi...@gmail.com wrote: One way I can see is to - 1. get rdd from your df 2. call rdd.zipWithIndex to get a new rdd 3. turn your new rdd to a new df On Fri, May 29, 2015 at 5:43 AM, Cesar Flores ces...@gmail.com wrote: Assuming that I have the next data frame: flag | price -- 1|47.808764653746 1|47.808764653746 1|31.9869279512204 1|47.7907893713564 1|16.7599200038239 1|16.7599200038239 1|20.3916014172137 How can I create a data frame with an extra indexed column as the next one: flag | price | index --|--- 1|47.808764653746 | 0 1|47.808764653746 | 1 1|31.9869279512204| 2 1|47.7907893713564| 3 1|16.7599200038239| 4 1|16.7599200038239| 5 1|20.3916014172137| 6 -- Cesar Flores
Re: Compute Median in Spark Dataframe
Hi Holden, Olivier So for column you need to pass in a Java function, I have some sample code which does this but it does terrible things to access Spark internals. I also need to call a Hive UDAF in a dataframe agg function. Are there any examples of what Column expects? Deenar On 2 June 2015 at 21:13, Holden Karau hol...@pigscanfly.ca wrote: So for column you need to pass in a Java function, I have some sample code which does this but it does terrible things to access Spark internals. On Tuesday, June 2, 2015, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Nice to hear from you Holden ! I ended up trying exactly that (Column) - but I may have done it wrong : In [*5*]: g.agg(Column(percentile(value, 0.5))) Py4JError: An error occurred while calling o97.agg. Trace: py4j.Py4JException: Method agg([class java.lang.String, class scala.collection.immutable.Nil$]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) Any idea ? Olivier. Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a écrit : Not super easily, the GroupedData class uses a strToExpr function which has a pretty limited set of functions so we cant pass in the name of an arbitrary hive UDAF (unless I'm missing something). We can instead construct an column with the expression you want and then pass it in to agg() that way (although then you need to call the hive UDAF there). There are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark SQL AggregateExpressions, but they are private. On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Optimisation advice for Avro-Parquet merge job
Hi, We have a load of Avro data coming into our data systems in the form of relatively small files, which we're merging into larger Parquet files with Spark. I've been following the docs and the approach I'm taking seemed fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's not the most optimal approach. I was wondering if anyone on this list might have some advice to make to make this job as efficient as possible. Here's some code: DataFrame dfInput = sqlContext.load(inputPaths.get(0), com.databricks.spark.avro); long totalSize = getDirSize(inputPaths.get(0)); for (int i = 1; i inputs.size(); ++i) { dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), com.databricks.spark.avro)); totalSize += getDirSize(inputPaths.get(i)); } int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); DataFrame outputFrame; // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence // the synchronize block below. Suggestions welcome here too! :-) synchronized (this) { RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false, null); outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); } outputFrame.save(outputPath, parquet, SaveMode.Overwrite); Here are some things bothering me: - Conversion to an RDD and back again so that we can use coalesce() to reduce the number of partitions. This is because we read that repartition() is not as efficient as coalesce(), and local micro benchmarks seemed to somewhat confirm that this was faster. Is this really a good idea though? Should we be doing something else? - Usage of unionAll() - this is the only way I could find to join the separate data sets into a single data frame to save as Parquet. Is there a better way? - Do I need to be using the DataFrame API at all? I'm not querying any data here, so the nice API for SQL-like transformations of the data isn't being used. The DataFrame API just seemed like the path of least resistance for working with Avro and Parquet. Would there be any advantage to using hadoopRDD() with the appropriate Input/Output formats? Any advice or tips greatly appreciated! James.
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
direct stream isn't a receiver, it isn't required to cache data anywhere unless you want it to. If you want it, just call cache. On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: set the storage policy for the DStream RDDs to MEMORY AND DISK - it appears the storage level can be specified in the createStream methods but not createDirectStream... On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not about scaling your system/solution In terms of how to scale the Spark Framework Dynamically – even though this is not supported at the moment out of the box I guess you can have a sys management framework spin
Re: Difference bewteen library dependencies version
For your first question, please take a look at HADOOP-9922. The fix is in hadoop-common module. Cheers On Thu, Jun 4, 2015 at 2:53 AM, Jean-Charles RISCH risch.jeanchar...@gmail.com wrote: Hello, *(Before everything : I use IntellijIdea 14.0.1, SBT and Scala 2.11.6)* This morning, I was looking to resolve the Failed to locate the winutils binary in the hadoop binary path error. I noticed that I can solve it configuring my build.sbt to ... libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 excludeAll( ExclusionRule(organization = org.apache.hadoop) ) libraryDependencies += org.apache.spark %% spark-mllib % 1.3.1 excludeAll( ExclusionRule(organization = org.apache.hadoop) ) but if i change the line libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4 to libraryDependencies += org.apache.hadoop % hadoop-client % 2.7.0 the error is back. What does it mean? Spark is build for an old version of hadoop? I really want to understand. *Also, a bonus question : * As you can see I am using spark 1.3.1 and spark-mllib APIs. I am using the last version, but my APIs are not corresponding to the latest official APIs (https://spark.apache.org/docs/*latest*/api/scala/#package) For example, to run a KMeans algo, I have to use KMeans.train() whereas it does not exist in the latest API. First time, I ask something in the mailing list, I hope I use it well. Sorry for my bad english. Thank you and have a good day, JC ᐧ