Re: SQLContext.applySchema strictness
Applying schema is a pretty low-level operation, and I would expect most users would use the type safe interfaces. If you are unsure you can always run: import org.apache.spark.sql.execution.debug._ schemaRDD.typeCheck() and it will tell you if you have made any mistakes. Michael On Sat, Feb 14, 2015 at 1:05 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Would it make sense to add an optional validate parameter to applySchema() which defaults to False, both to give users the option to check the schema immediately and to make the default behavior clearer? On Sat Feb 14 2015 at 9:18:59 AM Michael Armbrust mich...@databricks.com wrote: Doing runtime type checking is very expensive, so we only do it when necessary (i.e. you perform an operation like adding two columns together) On Sat, Feb 14, 2015 at 2:19 AM, nitin nitin2go...@gmail.com wrote: AFAIK, this is the expected behavior. You have to make sure that the schema matches the row. It won't give any error when you apply the schema as it doesn't validate the nature of data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle write increases in spark 1.2
I think Xuefeng Wu's suggestion is likely correct. This different is more likely explained by the compression library changing versions than sort vs hash shuffle (which should not affect output size significantly). Others have reported that switching to lz4 fixed their issue. We should document this if this is the case. I wonder if we're asking Snappy to be super-low-overhead and as a result the new version does a better job of it (less overhead, less compression). On Sat, Feb 14, 2015 at 9:32 AM, Peng Cheng pc...@uow.edu.au wrote: I double check the 1.2 feature list and found out that the new sort-based shuffle manager has nothing to do with HashPartitioner :- Sorry for the misinformation. In another hand. This may explain increase in shuffle spill as a side effect of the new shuffle manager, let me revert spark.shuffle.manager to hash and see if it make things better (or worse, as the benchmark in https://issues.apache.org/jira/browse/SPARK-3280 indicates) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Specifying AMI when using Spark EC-2 scripts
Hi, You can use -a or --ami your ami id to launch the cluster using specific ami. If I remember well, the default system is Amazon Linux. Hope it will help Cheers Gen On Sun, Feb 15, 2015 at 6:20 AM, olegshirokikh o...@solver.com wrote: Hi there, Is there a way to specify the AWS AMI with particular OS (say Ubuntu) when launching Spark on Amazon cloud with provided scripts? What is the default AMI, operating system that is launched by EC-2 script? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-AMI-when-using-Spark-EC-2-scripts-tp21658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Percentile example
hello, I am a newbie to spark and trying to figure out how to get percentile against a big data set. Actually, I googled this topic but not find any very useful code example and explanation. Seems that I can use transformer SortBykey to get my data set in order, but not pretty sure how can I get value of , for example, percentile 66. Should I use take() to pick up the value of percentile 66? I don't believe any machine can load my data set in memory. I believe there must be more efficient approaches. Can anyone shed some light on this problem? Regards
Re: Array in broadcast can't be serialized
I was looking at https://github.com/twitter/chill It seems this would achieve what you want: chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala Cheers On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized even when I registered both of them in Kryo. The code is as follows: val conf = new SparkConf() .setAppName(Hello Spark) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, xt.MyKryoRegistrator) val sc = new SparkContext(conf) val rdd = sc.parallelize(List( (new ImmutableBytesWritable(Bytes.toBytes(AAA)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(BBB)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(CCC)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(DDD)), new KeyValue())), 4) // snippet 1: a single object of *ImmutableBytesWritable* can be serialized in broadcast val partitioner = new SingleElementPartitioner(sc.broadcast(new ImmutableBytesWritable(Bytes.toBytes(3 val ret = rdd.aggregateByKey(List[KeyValue](), partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist() println(\n\n\ret.count = + ret.count + , partition size = + ret.partitions.size) // snippet 2: an array of *ImmutableBytesWritable* can not be serialized in broadcast val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new ImmutableBytesWritable(Bytes.toBytes(2)), new ImmutableBytesWritable(Bytes.toBytes(3))) val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) val ret1 = rdd.aggregateByKey(List[KeyValue](), newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ) println(\n\n\nrdd2.count = + ret1.count) sc.stop // the following are kryo registrator and partitioners class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[ImmutableBytesWritable]) // register ImmutableBytesWritable kryo.register(classOf[Array[ImmutableBytesWritable]]) // register Array[ImmutableBytesWritable] } } class SingleElementPartitioner(bc: Broadcast[ImmutableBytesWritable]) extends Partitioner { override def numPartitions: Int = 5 def v = Bytes.toInt(bc.value.get) override def getPartition(key: Any): Int = v - 1 } class ArrayPartitioner(bc: Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { val arr = bc.value override def numPartitions: Int = arr.length override def getPartition(key: Any): Int = Bytes.toInt(arr(0).get) } In the code above, snippet 1 can work as expected. But snippet 2 throws Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable . So do I have to implement a Kryo serializer for Array[T] if it is used in broadcast ? Thanks
Inconsistent execution times for same application.
Hi, My spark cluster contains machines like Pentium-4, dual core and quad-core machines. I am trying to run a character frequency count application. The application contains several threads, each submitting a job(action) that counts the frequency of a single character. But, my problem is, I get different execution times each time I run the same application with same data (1G text data). Sometimes the difference is as huge as 10-15 mins. I think, this pertains to scheduling when the cluster is heterogeneous in nature. Can someone please tell me how tackle this issue?. I need to get consistent results. Any suggestions please!! I cache() the rdd. Total 7 slave nodes. Executor memory=2500m. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-execution-times-for-same-application-tp21662.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Multidimensional K-Means
Clustering operates on a large number of n-dimensional vectors. That seems to be what you are describing, and that is what the MLlib API accepts. What are you expecting that you don't find? Did you have a look at the KMeansModel that this method returns? it has a clusterCenters method that gives you what you're looking for. Explore the API a bit more first. On Sun, Feb 15, 2015 at 4:26 PM, Attila Tóth atez...@gmail.com wrote: Dear Spark User List, I'm fairly new to Spark, trying to use it for multi-dimensional clustering (using the k-means clustering from MLib). However, based on the examples the clustering seems to work only for a single dimension (KMeans.train() accepts an RDD[Vector], which is a vector of doubles - I have a list of array of doubles, eg. a list of n-dimensional coordinates). Is there any way with which, given a list of arrays (or vectors) of doubles, I can get out the list of cluster centres (as a list of n-dimensional coordinates) in Spark? I'm using Scala. Thanks in advance, Attila - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming Low Performance
Thanks Enno, let me have a look at Stream Parser version of Jackson. Thanks Best Regards On Sat, Feb 14, 2015 at 9:30 PM, Enno Shioji eshi...@gmail.com wrote: Huh, that would come to 6.5ms per one JSON. That does feel like a lot but if your JSON file is big enough, I guess you could get that sort of processing time. Jackson is more or less the most efficient JSON parser out there, so unless the Scala API is somehow affecting it, I don't see any better way. If you only need to read parts of the JSON, you could look into exploiting Jackson's stream parsing API http://wiki.fasterxml.com/JacksonStreamingApi. I guess the good news is you can throw machines at it. You could also look into other serialization frameworks. ᐧ On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks again! Its with the parser only, just tried the parser https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it took me 52 Sec to process 8k json records. Not sure if there's an efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it will be much faster, but i need that in SparkStreaming. Thanks Best Regards On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote: I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks
Re: shark queries failed
I'd suggest you updating your spark to the latest version and try SparkSQL instead of Shark. Thanks Best Regards On Sun, Feb 15, 2015 at 7:36 AM, Grandl Robert rgra...@yahoo.com.invalid wrote: Hi guys, I deployed BlinkDB(built atop Shark) and running Spark 0.9. I tried to run several TPCDS shark queries taken from https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark. However, the following exceptions are encountered. Do you have any idea why that might happen ? Thanks, Robert 2015-02-14 17:58:29,358 WARN util.NativeCodeLoader (NativeCodeLoader.java:clinit(52)) - Unable to load native- hadoop library for your platform... using builtin-java classes where applicable 2015-02-14 17:58:29,360 WARN snappy.LoadSnappy (LoadSnappy.java:clinit(46)) - Snappy native library not loaded 2015-02-14 17:58:34,963 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2) 2015-02-14 17:58:34,970 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Loss was due to java.lang .ClassCastException java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to org.apache.hadoop.io.FloatWrita ble at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat ObjectInspector.java:35) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204) at shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal a:188) at shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal a:153) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 2015-02-14 17:58:34,983 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4) 2015-02-14 17:58:35,075 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8) 2015-02-14 17:58:35,119 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2) 2015-02-14 17:58:35,134 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5) 2015-02-14 17:58:35,187 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4) 2015-02-14 17:58:35,203 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7) 2015-02-14 17:58:35,214 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9) 2015-02-14 17:58:35,265 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0) 2015-02-14 17:58:35,274 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2) 2015-02-14 17:58:35,304 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8) 2015-02-14 17:58:35,330 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1) 2015-02-14 17:58:35,354 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4) 2015-02-14 17:58:35,387 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5) 2015-02-14 17:58:35,430 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3) 2015-02-14 17:58:35,432 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2) 2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager (Logging.scala:logError(65)) - Task 5.0:2 failed 4 times; aborting job 2015-02-14 17:58:35,438 ERROR ql.Driver (SessionState.java:printError(400)) - FAILED: Execution Error, return cod e -101 from shark.execution.SparkTask 2015-02-14 17:58:35,552 WARN scheduler.TaskSetManager
Re: Multidimensional K-Means
Hi Sean, Thanks for the quick answer. I have not realized that I can make an RDD[Vector] with eg. val dataSet = sparkContext.makeRDD(List(Vectors.dense(10.0,20.0), Vectors.dense(20.0,30.0))) Using this KMeans.train works as it should. So my bad. Thanks again! Attila 2015-02-15 17:29 GMT+01:00 Sean Owen so...@cloudera.com: Clustering operates on a large number of n-dimensional vectors. That seems to be what you are describing, and that is what the MLlib API accepts. What are you expecting that you don't find? Did you have a look at the KMeansModel that this method returns? it has a clusterCenters method that gives you what you're looking for. Explore the API a bit more first. On Sun, Feb 15, 2015 at 4:26 PM, Attila Tóth atez...@gmail.com wrote: Dear Spark User List, I'm fairly new to Spark, trying to use it for multi-dimensional clustering (using the k-means clustering from MLib). However, based on the examples the clustering seems to work only for a single dimension (KMeans.train() accepts an RDD[Vector], which is a vector of doubles - I have a list of array of doubles, eg. a list of n-dimensional coordinates). Is there any way with which, given a list of arrays (or vectors) of doubles, I can get out the list of cluster centres (as a list of n-dimensional coordinates) in Spark? I'm using Scala. Thanks in advance, Attila
spark-local dir running out of space during long ALS run
Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
Dynamic partition pattern support
Hi, HCatalog allows you to specify the pattern of paths for partitions, which will be used by dynamic partition loading. https://cwiki.apache.org/confluence/display/Hive/HCatalog+DynamicPartitions#HCatalogDynamicPartitions-ExternalTables Can we have similar feature in SparkSQL? Jira is here: https://issues.apache.org/jira/browse/SPARK-5828 Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Multidimensional K-Means
Dear Spark User List, I'm fairly new to Spark, trying to use it for multi-dimensional clustering (using the k-means clustering from MLib). However, based on the examples the clustering seems to work only for a single dimension (KMeans.train() accepts an RDD[Vector], which is a vector of doubles - I have a list of array of doubles, eg. a list of n-dimensional coordinates). Is there any way with which, given a list of arrays (or vectors) of doubles, I can get out the list of cluster centres (as a list of n-dimensional coordinates) in Spark? I'm using Scala. Thanks in advance, Attila
Re: New ColumnType For Decimal Caching
That sound right to me. Cheng could elaborate if you are missing something. On Fri, Feb 13, 2015 at 11:36 AM, Manoj Samel manojsamelt...@gmail.com wrote: Thanks Michael for the pointer Sorry for the delayed reply. Taking a quick inventory of scope of change - Is the column type for Decimal caching needed only in the caching layer (4 files in org.apache.spark.sql.columnar - ColumnAccessor.scala, ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala) Or do other SQL components also need to be touched ? Hoping for a quick feedback of top of your head ... Thanks, On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com wrote: You could add a new ColumnType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala . PRs welcome :) On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
shark queries failed
Hi guys, I deployed BlinkDB(built atop Shark) and running Spark 0.9. I tried to run several TPCDS shark queries taken from https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark. However, the following exceptions are encountered. Do you have any idea why that might happen ? Thanks,Robert 2015-02-14 17:58:29,358 WARN util.NativeCodeLoader (NativeCodeLoader.java:clinit(52)) - Unable to load native- hadoop library for your platform... using builtin-java classes where applicable 2015-02-14 17:58:29,360 WARN snappy.LoadSnappy (LoadSnappy.java:clinit(46)) - Snappy native library not loaded 2015-02-14 17:58:34,963 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2) 2015-02-14 17:58:34,970 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Loss was due to java.lang .ClassCastException java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to org.apache.hadoop.io.FloatWrita ble at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat ObjectInspector.java:35) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204) at shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal a:188) at shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal a:153) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 2015-02-14 17:58:34,983 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4) 2015-02-14 17:58:35,075 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8) 2015-02-14 17:58:35,119 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2) 2015-02-14 17:58:35,134 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5) 2015-02-14 17:58:35,187 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4) 2015-02-14 17:58:35,203 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7) 2015-02-14 17:58:35,214 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9) 2015-02-14 17:58:35,265 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0) 2015-02-14 17:58:35,274 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2) 2015-02-14 17:58:35,304 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8) 2015-02-14 17:58:35,330 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1) 2015-02-14 17:58:35,354 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4) 2015-02-14 17:58:35,387 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5) 2015-02-14 17:58:35,430 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3) 2015-02-14 17:58:35,432 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2) 2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager (Logging.scala:logError(65)) - Task 5.0:2 failed 4 times; aborting job 2015-02-14 17:58:35,438 ERROR ql.Driver (SessionState.java:printError(400)) - FAILED: Execution Error, return cod e -101 from shark.execution.SparkTask 2015-02-14 17:58:35,552 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 30 (task 6.0:0) 2015-02-14 17:58:35,565 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Loss was due to java.io.F ileNotFoundException java.io.FileNotFoundException: http://10.200.146.12:46812/broadcast_4 at
Re: Loading JSON dataset with Spark Mllib
Hi, In fact, you can use sqlCtx.jsonFile() which loads a text file storing one JSON object per line as a SchemaRDD. Or you can use sc.textFile() to load the textFile to RDD and then use sqlCtx.jsonRDD() which loads an RDD storing one JSON object per string as a SchemaRDD. Hope it could help Cheers Gen On Mon, Feb 16, 2015 at 12:39 AM, pankaj channe pankajc...@gmail.com wrote: Hi, I am new to spark and planning on writing a machine learning application with Spark mllib. My dataset is in json format. Is it possible to load data into spark without using any external json libraries? I have explored the option of SparkSql but I believe that is only for interactive use or loading data into hive tables. Thanks, Pankaj
Re: shark queries failed
Thanks for reply, Akhil. I cannot update the spark version and run SparkSQL due to some old dependencies and a specific project I want to run. I was wondering if you have any clue, why that exception might be triggered, or if you saw it before. Thanks,Robert On Sunday, February 15, 2015 9:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I'd suggest you updating your spark to the latest version and try SparkSQL instead of Shark. ThanksBest Regards On Sun, Feb 15, 2015 at 7:36 AM, Grandl Robert rgra...@yahoo.com.invalid wrote: Hi guys, I deployed BlinkDB(built atop Shark) and running Spark 0.9. I tried to run several TPCDS shark queries taken from https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark. However, the following exceptions are encountered. Do you have any idea why that might happen ? Thanks,Robert 2015-02-14 17:58:29,358 WARN util.NativeCodeLoader (NativeCodeLoader.java:clinit(52)) - Unable to load native- hadoop library for your platform... using builtin-java classes where applicable 2015-02-14 17:58:29,360 WARN snappy.LoadSnappy (LoadSnappy.java:clinit(46)) - Snappy native library not loaded 2015-02-14 17:58:34,963 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2) 2015-02-14 17:58:34,970 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Loss was due to java.lang .ClassCastException java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to org.apache.hadoop.io.FloatWrita ble at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat ObjectInspector.java:35) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204) at shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal a:188) at shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal a:153) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 2015-02-14 17:58:34,983 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4) 2015-02-14 17:58:35,075 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8) 2015-02-14 17:58:35,119 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2) 2015-02-14 17:58:35,134 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5) 2015-02-14 17:58:35,187 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4) 2015-02-14 17:58:35,203 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7) 2015-02-14 17:58:35,214 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9) 2015-02-14 17:58:35,265 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0) 2015-02-14 17:58:35,274 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2) 2015-02-14 17:58:35,304 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8) 2015-02-14 17:58:35,330 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1) 2015-02-14 17:58:35,354 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4) 2015-02-14 17:58:35,387 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5) 2015-02-14 17:58:35,430 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3) 2015-02-14 17:58:35,432 WARN scheduler.TaskSetManager (Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2) 2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager
Re: Unable to query hive tables from spark
What does your hive-site.xml look like? Do you actually have a directory at the location shown in the error? i.e does /user/hive/warehouse/src exist? You should be able to override this by specifying the following: --hiveconf hive.metastore.warehouse.dir=/location/where/your/warehouse/exists HTH. -Todd On Thu, Feb 12, 2015 at 1:16 AM, kundan kumar iitr.kun...@gmail.com wrote: I want to create/access the hive tables from spark. I have placed the hive-site.xml inside the spark/conf directory. Even though it creates a local metastore in the directory where I run the spark shell and exists with an error. I am getting this error when I try to create a new hive table. Even on querying a existing table error appears. sqlContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) Please suggest what wrong I am doing and a way to resolve this. 15/02/12 10:35:58 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
Re: New ColumnType For Decimal Caching
Hi Manoj, Yes, you've already hit the point. I think timestamp type support in the in-memory columnar support can be a good reference for you. Also, you may want to enable compression support for decimal type by adding DECIMAL column type to RunLengthEncoding.supports and DictionaryEncoding.supports. Thanks for working on this! Best, Cheng On Sat, Feb 14, 2015 at 5:32 PM, Michael Armbrust mich...@databricks.com wrote: That sound right to me. Cheng could elaborate if you are missing something. On Fri, Feb 13, 2015 at 11:36 AM, Manoj Samel manojsamelt...@gmail.com wrote: Thanks Michael for the pointer Sorry for the delayed reply. Taking a quick inventory of scope of change - Is the column type for Decimal caching needed only in the caching layer (4 files in org.apache.spark.sql.columnar - ColumnAccessor.scala, ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala) Or do other SQL components also need to be touched ? Hoping for a quick feedback of top of your head ... Thanks, On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com wrote: You could add a new ColumnType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala . PRs welcome :) On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: [GraphX] Excessive value recalculations during aggregateMessages cycles
Hi, I tried quick and simple tests though, ISTM the vertices below were correctly cached. Could you give me the differences between my codes and yours? import org.apache.spark.graphx._ import org.apache.spark.graphx.lib._ object Prog { def processInt(d: Int) = d * 2 } val g = GraphLoader.edgeListFile(sc, ../temp/graph.txt) .cache val g2 = g.outerJoinVertices(g.degrees)( (vid, old, msg) = Prog.processInt(msg.getOrElse(0))) .cache g2.vertices.count val g3 = g.outerJoinVertices(g.degrees)( (vid, old, msg) = msg.getOrElse(0)) .mapVertices((vid, d) = Prog.processInt(d)) .cache g3.vertices.count 'g2.vertices.toDebugString' outputs; (2) VertexRDDImpl[16] at RDD at VertexRDD.scala:57 [] | VertexRDD ZippedPartitionsRDD2[15] at zipPartitions at VertexRDDImpl.scala:121 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:319 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 [] | ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 [] +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:330 [] | GraphLoader.edgeListFile - edges (../temp/graph.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at Graph... 'g3.vertices.toDebugString' outputs; (2) VertexRDDImpl[33] at RDD at VertexRDD.scala:57 [] | VertexRDD MapPartitionsRDD[32] at mapPartitions at VertexRDDImpl.scala:96 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | VertexRDD ZippedPartitionsRDD2[24] at zipPartitions at VertexRDDImpl.scala:121 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:319 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 [] | ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 [] +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPar... -- maropu On Mon, Feb 9, 2015 at 5:47 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I changed the curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() to curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = (vertex, message.getOrElse(List[Message]())) ).mapVertices( (x,y) = y._1.process( y._2, ti ) ).cache() So the call to the 'process' method was moved out of the outerJoinVertices and into a separate mapVertices call, and the problem went away. Now, 'process' is only called once during the correct cycle. So it would appear that outerJoinVertices caches the closure to be recalculated if needed again while mapVertices actually caches the derived values. Is this a bug or a feature? Kyle On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them. The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) = y.init()) for (i - 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x = { //send messages . }, (x, y) = { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to
Re: Shuffle write increases in spark 1.2
I have seen same behavior! I would love to hear an update on this... Thanks, Ami On Thu, Feb 5, 2015 at 8:26 AM, Anubhav Srivastav anubhav.srivas...@gmail.com wrote: Hi Kevin, We seem to be facing the same problem as well. Were you able to find anything after that? The ticket does not seem to have progressed anywhere. Regards, Anubhav On 5 January 2015 at 10:37, 정재부 itsjb.j...@samsung.com wrote: Sure, here is a ticket. https://issues.apache.org/jira/browse/SPARK-5081 --- *Original Message* --- *Sender* : Josh Rosenrosenvi...@gmail.com *Date* : 2015-01-05 06:14 (GMT+09:00) *Title* : Re: Shuffle write increases in spark 1.2 If you have a small reproduction for this issue, can you open a ticket at https://issues.apache.org/jira/browse/SPARK ? On December 29, 2014 at 7:10:02 PM, Kevin Jung (itsjb.j...@samsung.com) wrote: Hi all, The size of shuffle write showing in spark web UI is mush different when I execute same spark job on same input data(100GB) in both spark 1.1 and spark 1.2. At the same sortBy stage, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 writes larger file than spark 1.1. Can anyone tell me why this happened? Thanks Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Array in broadcast can't be serialized
I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized even when I registered both of them in Kryo. The code is as follows: val conf = new SparkConf() .setAppName(Hello Spark) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, xt.MyKryoRegistrator) val sc = new SparkContext(conf) val rdd = sc.parallelize(List( (new ImmutableBytesWritable(Bytes.toBytes(AAA)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(BBB)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(CCC)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(DDD)), new KeyValue())), 4) // snippet 1: a single object of *ImmutableBytesWritable* can be serialized in broadcast val partitioner = new SingleElementPartitioner(sc.broadcast(new ImmutableBytesWritable(Bytes.toBytes(3 val ret = rdd.aggregateByKey(List[KeyValue](), partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist() println(\n\n\ret.count = + ret.count + , partition size = + ret.partitions.size) // snippet 2: an array of *ImmutableBytesWritable* can not be serialized in broadcast val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new ImmutableBytesWritable(Bytes.toBytes(2)), new ImmutableBytesWritable(Bytes.toBytes(3))) val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) val ret1 = rdd.aggregateByKey(List[KeyValue](), newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ) println(\n\n\nrdd2.count = + ret1.count) sc.stop // the following are kryo registrator and partitioners class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[ImmutableBytesWritable]) // register ImmutableBytesWritable kryo.register(classOf[Array[ImmutableBytesWritable]]) // register Array[ImmutableBytesWritable] } } class SingleElementPartitioner(bc: Broadcast[ImmutableBytesWritable]) extends Partitioner { override def numPartitions: Int = 5 def v = Bytes.toInt(bc.value.get) override def getPartition(key: Any): Int = v - 1 } class ArrayPartitioner(bc: Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { val arr = bc.value override def numPartitions: Int = arr.length override def getPartition(key: Any): Int = Bytes.toInt(arr(0).get) } In the code above, snippet 1 can work as expected. But snippet 2 throws Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable . So do I have to implement a Kryo serializer for Array[T] if it is used in broadcast ? Thanks
Specifying AMI when using Spark EC-2 scripts
Hi there, Is there a way to specify the AWS AMI with particular OS (say Ubuntu) when launching Spark on Amazon cloud with provided scripts? What is the default AMI, operating system that is launched by EC-2 script? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-AMI-when-using-Spark-EC-2-scripts-tp21658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Extract hour from Timestamp in Spark SQL
Are you using the SQLContext? I think the HiveContext is recommended. Cheng Hao From: Wush Wu [mailto:w...@bridgewell.com] Sent: Thursday, February 12, 2015 2:24 PM To: u...@spark.incubator.apache.org Subject: Extract hour from Timestamp in Spark SQL Dear all, I am new to Spark SQL and have no experience of Hive. I tried to use the built-in Hive Function to extract the hour from timestamp in spark sql, but got : java.util.NoSuchElementException: key not found: hour How should I extract the hour from timestamp? And I am very confusing about which functions I could use in Spark SQL. Is there any list of available functions except http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#compatibility-with-apache-hive ? Thanks, Wush
monit with spark
We want to monitor spark master and spark slaves using monit but we want to use the sbin scripts to do so. The scripts create the spark master and salve processes independent from themselves so monit would not know the started processed pid to watch. Is this correct? Should we watch the ports? How should we configure monit to run and monitor spark standalone processes? -- Thanks, Mike
Loading JSON dataset with Spark Mllib
Hi, I am new to spark and planning on writing a machine learning application with Spark mllib. My dataset is in json format. Is it possible to load data into spark without using any external json libraries? I have explored the option of SparkSql but I believe that is only for interactive use or loading data into hive tables. Thanks, Pankaj
Re: spark-local dir running out of space during long ALS run
spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
Re: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0
It works now using 1.2.1. Thanks for all the help. Spark rocks !! - Thanks, Roy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-HiveContext-created-SchemaRDD-s-saveAsTable-is-not-working-on-1-2-0-tp21442p21664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to HDFS from spark Streaming
I used the latest assembly jar and the below as suggested by Akhil to fix this problem... temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class, *(Class)* TextOutputFormat.class); Thanks All for the help ! On Wed, Feb 11, 2015 at 1:38 PM, Sean Owen so...@cloudera.com wrote: That kinda dodges the problem by ignoring generic types. But it may be simpler than the 'real' solution, which is a bit ugly. (But first, to double check, are you importing the correct TextOutputFormat? there are two versions. You use .mapred. with the old API and .mapreduce. with the new API.) Here's how I've formally casted around it in similar code: @SuppressWarnings Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) TextOutputFormat.class; and then pass that as the final argument. On Wed, Feb 11, 2015 at 6:35 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try : temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class,(Class) TextOutputFormat.class); Thanks Best Regards On Wed, Feb 11, 2015 at 9:40 AM, Bahubali Jain bahub...@gmail.com wrote: Hi, I am facing issues while writing data from a streaming rdd to hdfs.. JavaPairDstreamString,String temp; ... ... temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class,TextOutputFormat.class); I see compilation issues as below... The method saveAsHadoopFiles(String, String, Class?, Class?, Class? extends OutputFormat?,?) in the type JavaPairDStreamString,String is not applicable for the arguments (String, String, ClassString, ClassString, ClassTextOutputFormat) I see same kind of problem even with saveAsNewAPIHadoopFiles API . Thanks, Baahu -- Twitter:http://twitter.com/Baahu
WARN from Similarity Calculation
Hi, I am sometimes getting WARN from running Similarity calculation: 15/02/15 23:07:55 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, abc.com, 48419, 0) with no recent heart beats: 66435ms exceeds 45000ms Do I need to increase the default 45 s to larger values for cases where we are doing blocked operation or long compute in the mapPartitions ? Thanks. Deb