Re: SQLContext.applySchema strictness

2015-02-15 Thread Michael Armbrust
Applying schema is a pretty low-level operation, and I would expect most
users would use the type safe interfaces.  If you are unsure you can always
run:

import org.apache.spark.sql.execution.debug._
schemaRDD.typeCheck()

and it will tell you if you have made any mistakes.

Michael

On Sat, Feb 14, 2015 at 1:05 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Would it make sense to add an optional validate parameter to applySchema()
 which defaults to False, both to give users the option to check the schema
 immediately and to make the default behavior clearer?
 ​

 On Sat Feb 14 2015 at 9:18:59 AM Michael Armbrust mich...@databricks.com
 wrote:

 Doing runtime type checking is very expensive, so we only do it when
 necessary (i.e. you perform an operation like adding two columns together)

 On Sat, Feb 14, 2015 at 2:19 AM, nitin nitin2go...@gmail.com wrote:

 AFAIK, this is the expected behavior. You have to make sure that the
 schema
 matches the row. It won't give any error when you apply the schema as it
 doesn't validate the nature of data.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Shuffle write increases in spark 1.2

2015-02-15 Thread Aaron Davidson
I think Xuefeng Wu's suggestion is likely correct. This different is more
likely explained by the compression library changing versions than sort vs
hash shuffle (which should not affect output size significantly). Others
have reported that switching to lz4 fixed their issue.

We should document this if this is the case. I wonder if we're asking
Snappy to be super-low-overhead and as a result the new version does a
better job of it (less overhead, less compression).

On Sat, Feb 14, 2015 at 9:32 AM, Peng Cheng pc...@uow.edu.au wrote:

 I double check the 1.2 feature list and found out that the new sort-based
 shuffle manager has nothing to do with HashPartitioner :- Sorry for the
 misinformation.

 In another hand. This may explain increase in shuffle spill as a side
 effect
 of the new shuffle manager, let me revert spark.shuffle.manager to hash and
 see if it make things better (or worse, as the benchmark in
 https://issues.apache.org/jira/browse/SPARK-3280 indicates)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Specifying AMI when using Spark EC-2 scripts

2015-02-15 Thread gen tang
Hi,

You can use -a or --ami your ami id to launch the cluster using specific
ami.
If I remember well, the default system is Amazon Linux.

Hope it will help

Cheers
Gen


On Sun, Feb 15, 2015 at 6:20 AM, olegshirokikh o...@solver.com wrote:

 Hi there,

 Is there a way to specify the AWS AMI with particular OS (say Ubuntu) when
 launching Spark on Amazon cloud with provided scripts?

 What is the default AMI, operating system that is launched by EC-2 script?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-AMI-when-using-Spark-EC-2-scripts-tp21658.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Percentile example

2015-02-15 Thread SiMaYunRui
hello, 
I am a newbie to spark and trying to figure out how to get percentile against a 
big data set. Actually, I googled this topic but not find any very useful code 
example and explanation. Seems that I can use transformer SortBykey to get my 
data set in order, but not pretty sure how can I get value of , for example, 
percentile 66. 
Should I use take() to pick up the value of percentile 66? I don't believe any 
machine can load my data set in memory. I believe there must be more efficient 
approaches. 
Can anyone shed some light on this problem? 
Regards
  

Re: Array in broadcast can't be serialized

2015-02-15 Thread Ted Yu
I was looking at https://github.com/twitter/chill

It seems this would achieve what you want:
chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala

Cheers

On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com wrote:

 I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
 serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
 serialized even when I registered both of them in Kryo.

 The code is as follows:

val conf = new SparkConf()
 .setAppName(Hello Spark)
 .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.kryo.registrator, xt.MyKryoRegistrator)

 val sc = new SparkContext(conf)

 val rdd = sc.parallelize(List(
 (new ImmutableBytesWritable(Bytes.toBytes(AAA)), new
 KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(BBB)), new
 KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(CCC)), new
 KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(DDD)), new
 KeyValue())), 4)

 // snippet 1:  a single object of *ImmutableBytesWritable* can be
 serialized in broadcast
 val partitioner = new SingleElementPartitioner(sc.broadcast(new
 ImmutableBytesWritable(Bytes.toBytes(3
 val ret = rdd.aggregateByKey(List[KeyValue](),
 partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist()
 println(\n\n\ret.count =  + ret.count + ,  partition size =  +
 ret.partitions.size)

 // snippet 2: an array of *ImmutableBytesWritable* can not be
 serialized in broadcast
 val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
 ImmutableBytesWritable(Bytes.toBytes(2)), new
 ImmutableBytesWritable(Bytes.toBytes(3)))
 val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
 val ret1 = rdd.aggregateByKey(List[KeyValue](),
 newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys )
 println(\n\n\nrdd2.count =  + ret1.count)

 sc.stop


   // the following are kryo registrator and partitioners
class MyKryoRegistrator extends KryoRegistrator {
 override def registerClasses(kryo: Kryo): Unit = {
  kryo.register(classOf[ImmutableBytesWritable])   //
 register ImmutableBytesWritable
  kryo.register(classOf[Array[ImmutableBytesWritable]])  // 
 register
 Array[ImmutableBytesWritable]
 }
}

class SingleElementPartitioner(bc:
 Broadcast[ImmutableBytesWritable]) extends Partitioner {
 override def numPartitions: Int = 5
 def v = Bytes.toInt(bc.value.get)
 override def getPartition(key: Any): Int =  v - 1
}


 class ArrayPartitioner(bc:
 Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
 val arr = bc.value
 override def numPartitions: Int = arr.length
 override def getPartition(key: Any): Int =
 Bytes.toInt(arr(0).get)
 }



 In the code above, snippet 1 can work as expected. But snippet 2 throws
 Task not serializable: java.io.NotSerializableException:
 org.apache.hadoop.hbase.io.ImmutableBytesWritable  .


 So do I have to implement a Kryo serializer for Array[T] if it is used in
 broadcast ?

 Thanks







Inconsistent execution times for same application.

2015-02-15 Thread Kartheek.R
Hi,
My spark cluster contains machines like Pentium-4, dual core and quad-core
machines. I am trying to run a character frequency count application. The
application contains several threads, each submitting a job(action) that
counts the frequency of a single character. But, my problem is, I get
different execution times each time I run the same application with same
data (1G text data). Sometimes the difference is as huge as 10-15 mins. I
think, this pertains to scheduling when the cluster is heterogeneous in
nature. Can someone please tell me how tackle this issue?. I need to get
consistent results. Any suggestions please!!

I cache() the rdd. Total 7 slave nodes. Executor memory=2500m.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-execution-times-for-same-application-tp21662.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Multidimensional K-Means

2015-02-15 Thread Sean Owen
Clustering operates on a large number of n-dimensional vectors. That
seems to be what you are describing, and that is what the MLlib API
accepts. What are you expecting that you don't find?

Did you have a look at the KMeansModel that this method returns? it
has a clusterCenters method that gives you what you're looking for.
Explore the API a bit more first.

On Sun, Feb 15, 2015 at 4:26 PM, Attila Tóth atez...@gmail.com wrote:
 Dear Spark User List,

 I'm fairly new to Spark, trying to use it for multi-dimensional clustering
 (using the k-means clustering from MLib). However, based on the examples the
 clustering seems to work only for a single dimension (KMeans.train() accepts
 an RDD[Vector], which is a vector of doubles - I have a list of array of
 doubles, eg. a list of n-dimensional coordinates).

 Is there any way with which, given a list of arrays (or vectors) of doubles,
 I can get out the list of cluster centres (as a list of n-dimensional
 coordinates) in Spark?

 I'm using Scala.

 Thanks in advance,
 Attila

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkStreaming Low Performance

2015-02-15 Thread Akhil Das
Thanks Enno, let me have a look at Stream Parser version of Jackson.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 9:30 PM, Enno Shioji eshi...@gmail.com wrote:

 Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
 if your JSON file is big enough, I guess you could get that sort of
 processing time.

 Jackson is more or less the most efficient JSON parser out there, so
 unless the Scala API is somehow affecting it, I don't see any better way.
 If you only need to read parts of the JSON, you could look into exploiting
 Jackson's stream parsing API
 http://wiki.fasterxml.com/JacksonStreamingApi.

 I guess the good news is you can throw machines at it. You could also look
 into other serialization frameworks.



 ᐧ

 On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks again!
 Its with the parser only, just tried the parser
 https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And
 it took me 52 Sec to process 8k json records. Not sure if there's an
 efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
 and all it will be much faster, but i need that in SparkStreaming.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote:

 I see. I'd really benchmark how the parsing performs outside Spark (in a
 tight loop or something). If *that* is slow, you know it's the parsing. If
 not, it's not the parsing.

 Another thing you want to look at is CPU usage. If the actual parsing
 really is the bottleneck, you should see very high CPU utilization. If not,
 it's not the parsing per se but rather the ability to feed the messages to
 the parsing library.


 ᐧ

 On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com
 wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination
 and you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Thanks for the reply Enno, in my case rate from the stream is not
 the bottleneck as i'm able to consume all those records at a time (have
 tested it). And regarding the ObjectMapper, if i take it outside of my 
 map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com
 wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the
 ObjectMapper every time. This is quite expensive and jackson 
 recommends you
 to share the instance. However, if you tried other parsers / 
 mapPartitions
 without success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
 memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 

Re: shark queries failed

2015-02-15 Thread Akhil Das
I'd suggest you updating your spark to the latest version and try SparkSQL
instead of Shark.

Thanks
Best Regards

On Sun, Feb 15, 2015 at 7:36 AM, Grandl Robert rgra...@yahoo.com.invalid
wrote:

 Hi guys,

 I deployed BlinkDB(built atop Shark) and running Spark 0.9.

 I tried to run several TPCDS shark queries taken from
 https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark.
 However, the following exceptions are encountered. Do you have any idea why
 that might happen ?

 Thanks,
 Robert

 2015-02-14 17:58:29,358 WARN  util.NativeCodeLoader
 (NativeCodeLoader.java:clinit(52)) - Unable to load native-
 hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-14 17:58:29,360 WARN  snappy.LoadSnappy
 (LoadSnappy.java:clinit(46)) - Snappy native library not loaded
 2015-02-14 17:58:34,963 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2)
 2015-02-14 17:58:34,970 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Loss was due to java.lang
 .ClassCastException
 java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be
 cast to org.apache.hadoop.io.FloatWrita
 ble
 at
 org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat
 ObjectInspector.java:35)
 at
 org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331)
 at
 org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257)
 at
 org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204)
 at
 shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
 a:188)
 at
 shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
 a:153)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)
 2015-02-14 17:58:34,983 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4)
 2015-02-14 17:58:35,075 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8)
 2015-02-14 17:58:35,119 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2)
 2015-02-14 17:58:35,134 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5)
 2015-02-14 17:58:35,187 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4)
 2015-02-14 17:58:35,203 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7)
 2015-02-14 17:58:35,214 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9)
 2015-02-14 17:58:35,265 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0)
 2015-02-14 17:58:35,274 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2)
 2015-02-14 17:58:35,304 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8)
 2015-02-14 17:58:35,330 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1)
 2015-02-14 17:58:35,354 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4)
 2015-02-14 17:58:35,387 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5)
 2015-02-14 17:58:35,430 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3)
 2015-02-14 17:58:35,432 WARN  scheduler.TaskSetManager
 (Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2)
 2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager
 (Logging.scala:logError(65)) - Task 5.0:2 failed 4 times;
 aborting job
 2015-02-14 17:58:35,438 ERROR ql.Driver
 (SessionState.java:printError(400)) - FAILED: Execution Error, return cod
 e -101 from shark.execution.SparkTask
 2015-02-14 17:58:35,552 WARN  scheduler.TaskSetManager
 

Re: Multidimensional K-Means

2015-02-15 Thread Attila Tóth
Hi Sean,

Thanks for the quick answer. I have not realized that I can make an
RDD[Vector] with eg.

val dataSet = sparkContext.makeRDD(List(Vectors.dense(10.0,20.0),
Vectors.dense(20.0,30.0)))

Using this KMeans.train works as it should.

So my bad. Thanks again!

Attila

2015-02-15 17:29 GMT+01:00 Sean Owen so...@cloudera.com:

 Clustering operates on a large number of n-dimensional vectors. That
 seems to be what you are describing, and that is what the MLlib API
 accepts. What are you expecting that you don't find?

 Did you have a look at the KMeansModel that this method returns? it
 has a clusterCenters method that gives you what you're looking for.
 Explore the API a bit more first.

 On Sun, Feb 15, 2015 at 4:26 PM, Attila Tóth atez...@gmail.com wrote:
  Dear Spark User List,
 
  I'm fairly new to Spark, trying to use it for multi-dimensional
 clustering
  (using the k-means clustering from MLib). However, based on the examples
 the
  clustering seems to work only for a single dimension (KMeans.train()
 accepts
  an RDD[Vector], which is a vector of doubles - I have a list of array of
  doubles, eg. a list of n-dimensional coordinates).
 
  Is there any way with which, given a list of arrays (or vectors) of
 doubles,
  I can get out the list of cluster centres (as a list of n-dimensional
  coordinates) in Spark?
 
  I'm using Scala.
 
  Thanks in advance,
  Attila



spark-local dir running out of space during long ALS run

2015-02-15 Thread Antony Mayi
Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

Dynamic partition pattern support

2015-02-15 Thread Jianshi Huang
Hi,

HCatalog allows you to specify the pattern of paths for partitions, which
will be used by dynamic partition loading.


https://cwiki.apache.org/confluence/display/Hive/HCatalog+DynamicPartitions#HCatalogDynamicPartitions-ExternalTables

Can we have similar feature in SparkSQL?

Jira is here: https://issues.apache.org/jira/browse/SPARK-5828

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Multidimensional K-Means

2015-02-15 Thread Attila Tóth
Dear Spark User List,

I'm fairly new to Spark, trying to use it for multi-dimensional clustering
(using the k-means clustering from MLib). However, based on the examples
the clustering seems to work only for a single dimension (KMeans.train()
accepts an RDD[Vector], which is a vector of doubles - I have a list of
array of doubles, eg. a list of n-dimensional coordinates).

Is there any way with which, given a list of arrays (or vectors) of
doubles, I can get out the list of cluster centres (as a list of
n-dimensional coordinates) in Spark?

I'm using Scala.

Thanks in advance,
Attila


Re: New ColumnType For Decimal Caching

2015-02-15 Thread Michael Armbrust
That sound right to me.  Cheng could elaborate if you are missing something.

On Fri, Feb 13, 2015 at 11:36 AM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Thanks Michael for the pointer  Sorry for the delayed reply.

 Taking a quick inventory of scope of change - Is the column type for
 Decimal caching needed only in the caching layer (4 files
 in org.apache.spark.sql.columnar - ColumnAccessor.scala,
 ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala)

 Or do other SQL components also need to be touched ?

 Hoping for a quick feedback of top of your head ...

 Thanks,



 On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com
 wrote:

 You could add a new ColumnType
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
 .

 PRs welcome :)

 On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 As a test, I have same data loaded as another parquet - except with the
 2 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
 the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
 time of uncached query.

 Would it be possible for Spark to store in-memory decimal in some form
 of long with decoration ?

 For the immediate future, is there any hook that we can use to provide
 custom caching / processing for the decimal type in RDD so other semantic
 does not changes ?

 Thanks,




 On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Could you share which data types are optimized in the in-memory storage
 and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel 
 manojsamelt...@gmail.com wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10
 partitions) with zero disk usage. Tasks for subsequent select on this 
 table
 in cache shows minimal overheads (GC, queueing, shuffle write etc. 
 etc.),
 so overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer =
 org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB.
 For mapPartition phase of query on uncached table shows input size of 
 351
 MB. However, after the table is cached, the storage shows the cache 
 size as
 12GB. So the in-memory representation seems much bigger than on-disk, 
 even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of
 12GB (full size of cache table) and takes twice the time as 
 mapPartition
 for uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading 
 the data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel 
 manojsamelt...@gmail.com wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time
 slow since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is 
 slower than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,













shark queries failed

2015-02-15 Thread Grandl Robert
Hi guys,
I deployed BlinkDB(built atop Shark) and running Spark 0.9. 
I tried to run several TPCDS shark queries taken from 
https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark.
 However, the following exceptions are encountered. Do you have any idea why 
that might happen ? 

Thanks,Robert

2015-02-14 17:58:29,358 WARN  util.NativeCodeLoader 
(NativeCodeLoader.java:clinit(52)) - Unable to load native-
hadoop library for your platform... using builtin-java classes where applicable
2015-02-14 17:58:29,360 WARN  snappy.LoadSnappy (LoadSnappy.java:clinit(46)) 
- Snappy native library not loaded
2015-02-14 17:58:34,963 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2)
2015-02-14 17:58:34,970 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Loss was due to java.lang
.ClassCastException
java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast 
to org.apache.hadoop.io.FloatWrita
ble
    at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat
ObjectInspector.java:35)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:188)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:153)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
2015-02-14 17:58:34,983 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4)
2015-02-14 17:58:35,075 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8)
2015-02-14 17:58:35,119 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2)
2015-02-14 17:58:35,134 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5)
2015-02-14 17:58:35,187 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4)
2015-02-14 17:58:35,203 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7)
2015-02-14 17:58:35,214 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9)
2015-02-14 17:58:35,265 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0)
2015-02-14 17:58:35,274 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2)
2015-02-14 17:58:35,304 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8)
2015-02-14 17:58:35,330 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1)
2015-02-14 17:58:35,354 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4)
2015-02-14 17:58:35,387 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5)
2015-02-14 17:58:35,430 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3)
2015-02-14 17:58:35,432 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2)
2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager 
(Logging.scala:logError(65)) - Task 5.0:2 failed 4 times; 
aborting job
2015-02-14 17:58:35,438 ERROR ql.Driver (SessionState.java:printError(400)) - 
FAILED: Execution Error, return cod
e -101 from shark.execution.SparkTask
2015-02-14 17:58:35,552 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 30 (task 6.0:0)
2015-02-14 17:58:35,565 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Loss was due to java.io.F
ileNotFoundException
java.io.FileNotFoundException: http://10.200.146.12:46812/broadcast_4
    at 

Re: Loading JSON dataset with Spark Mllib

2015-02-15 Thread gen tang
Hi,

In fact, you can use sqlCtx.jsonFile() which loads a text file storing one
JSON object per line as a SchemaRDD.
Or you can use sc.textFile() to load the textFile to RDD and then use
sqlCtx.jsonRDD() which loads an RDD storing one JSON object per string as a
SchemaRDD.

Hope it could help
Cheers
Gen


On Mon, Feb 16, 2015 at 12:39 AM, pankaj channe pankajc...@gmail.com
wrote:

 Hi,

 I am new to spark and planning on writing a machine learning application
 with Spark mllib. My dataset is in json format. Is it possible to load data
 into spark without using any external json libraries? I have explored the
 option of SparkSql but I believe that is only for interactive use or
 loading data into hive tables.

 Thanks,
 Pankaj



Re: shark queries failed

2015-02-15 Thread Grandl Robert
Thanks for reply, Akhil. I cannot update the spark version and run SparkSQL due 
to some old dependencies and a specific project I want to run. 

I was wondering if you have any clue, why that exception might be triggered, or 
if you saw it before. 

Thanks,Robert
 

 On Sunday, February 15, 2015 9:18 AM, Akhil Das 
ak...@sigmoidanalytics.com wrote:
   

 I'd suggest you updating your spark to the latest version and try SparkSQL 
instead of Shark.
ThanksBest Regards
On Sun, Feb 15, 2015 at 7:36 AM, Grandl Robert rgra...@yahoo.com.invalid 
wrote:

Hi guys,
I deployed BlinkDB(built atop Shark) and running Spark 0.9. 
I tried to run several TPCDS shark queries taken from 
https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries/shark.
 However, the following exceptions are encountered. Do you have any idea why 
that might happen ? 

Thanks,Robert

2015-02-14 17:58:29,358 WARN  util.NativeCodeLoader 
(NativeCodeLoader.java:clinit(52)) - Unable to load native-
hadoop library for your platform... using builtin-java classes where applicable
2015-02-14 17:58:29,360 WARN  snappy.LoadSnappy (LoadSnappy.java:clinit(46)) 
- Snappy native library not loaded
2015-02-14 17:58:34,963 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 6 (task 5.0:2)
2015-02-14 17:58:34,970 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Loss was due to java.lang
.ClassCastException
java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast 
to org.apache.hadoop.io.FloatWrita
ble
    at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableFloatObjectInspector.get(WritableFloat
ObjectInspector.java:35)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:331)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:257)
    at 
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:204)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:188)
    at 
shark.execution.ReduceSinkOperator$$anonfun$processPartitionNoDistinct$1.apply(ReduceSinkOperator.scal
a:153)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
2015-02-14 17:58:34,983 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 8 (task 5.0:4)
2015-02-14 17:58:35,075 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 12 (task 5.0:8)
2015-02-14 17:58:35,119 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 15 (task 5.0:2)
2015-02-14 17:58:35,134 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 9 (task 5.0:5)
2015-02-14 17:58:35,187 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 16 (task 5.0:4)
2015-02-14 17:58:35,203 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 11 (task 5.0:7)
2015-02-14 17:58:35,214 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 13 (task 5.0:9)
2015-02-14 17:58:35,265 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 4 (task 5.0:0)
2015-02-14 17:58:35,274 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 18 (task 5.0:2)
2015-02-14 17:58:35,304 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 17 (task 5.0:8)
2015-02-14 17:58:35,330 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 5 (task 5.0:1)
2015-02-14 17:58:35,354 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 20 (task 5.0:4)
2015-02-14 17:58:35,387 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 19 (task 5.0:5)
2015-02-14 17:58:35,430 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 7 (task 5.0:3)
2015-02-14 17:58:35,432 WARN  scheduler.TaskSetManager 
(Logging.scala:logWarning(61)) - Lost TID 24 (task 5.0:2)
2015-02-14 17:58:35,433 ERROR scheduler.TaskSetManager 

Re: Unable to query hive tables from spark

2015-02-15 Thread Todd Nist
What does your hive-site.xml look like?  Do you actually have a directory
at the location shown in the error?  i.e does /user/hive/warehouse/src
exist?  You should be able to override this by specifying the following:

--hiveconf
hive.metastore.warehouse.dir=/location/where/your/warehouse/exists

HTH.

-Todd

On Thu, Feb 12, 2015 at 1:16 AM, kundan kumar iitr.kun...@gmail.com wrote:

 I want to create/access the hive tables from spark.

 I have placed the hive-site.xml inside the spark/conf directory. Even
 though it creates a local metastore in the directory where I run the spark
 shell and exists with an error.

 I am getting this error when I try to create a new hive table. Even on
 querying a existing table error appears.

 sqlContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))

 Please suggest what wrong I am doing and a way to resolve this.

 15/02/12 10:35:58 ERROR RetryingHMSHandler: 
 MetaException(message:file:/user/hive/warehouse/src is not a directory or 
 unable to create one)
 at 
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at 
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)




Re: New ColumnType For Decimal Caching

2015-02-15 Thread Cheng Lian
Hi Manoj,

Yes, you've already hit the point. I think timestamp type support in the
in-memory columnar support can be a good reference for you. Also, you may
want to enable compression support for decimal type by adding DECIMAL
column type to RunLengthEncoding.supports and DictionaryEncoding.supports.
Thanks for working on this!

Best,
Cheng

On Sat, Feb 14, 2015 at 5:32 PM, Michael Armbrust mich...@databricks.com
wrote:

 That sound right to me.  Cheng could elaborate if you are missing
 something.

 On Fri, Feb 13, 2015 at 11:36 AM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Thanks Michael for the pointer  Sorry for the delayed reply.

 Taking a quick inventory of scope of change - Is the column type for
 Decimal caching needed only in the caching layer (4 files
 in org.apache.spark.sql.columnar - ColumnAccessor.scala,
 ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala)

 Or do other SQL components also need to be touched ?

 Hoping for a quick feedback of top of your head ...

 Thanks,



 On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com
 wrote:

 You could add a new ColumnType
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
 .

 PRs welcome :)

 On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 As a test, I have same data loaded as another parquet - except with the
 2 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
 the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
 time of uncached query.

 Would it be possible for Spark to store in-memory decimal in some form
 of long with decoration ?

 For the immediate future, is there any hook that we can use to provide
 custom caching / processing for the decimal type in RDD so other semantic
 does not changes ?

 Thanks,




 On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Could you share which data types are optimized in the in-memory
 storage and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
  wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel 
 manojsamelt...@gmail.com wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10
 partitions) with zero disk usage. Tasks for subsequent select on this 
 table
 in cache shows minimal overheads (GC, queueing, shuffle write etc. 
 etc.),
 so overhead is not issue. However, it is still twice as slow as 
 reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer =
 org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB.
 For mapPartition phase of query on uncached table shows input size of 
 351
 MB. However, after the table is cached, the storage shows the cache 
 size as
 12GB. So the in-memory representation seems much bigger than on-disk, 
 even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of
 12GB (full size of cache table) and takes twice the time as 
 mapPartition
 for uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading 
 the data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel 
 manojsamelt...@gmail.com wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time
 slow since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is 
 slower than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,














Re: [GraphX] Excessive value recalculations during aggregateMessages cycles

2015-02-15 Thread Takeshi Yamamuro
Hi,

I tried quick and simple tests though, ISTM the vertices below were
correctly cached.
Could you give me the differences between my codes and yours?

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

object Prog {
  def processInt(d: Int) = d * 2
}

val g = GraphLoader.edgeListFile(sc, ../temp/graph.txt)
.cache

val g2 = g.outerJoinVertices(g.degrees)(
  (vid, old, msg) = Prog.processInt(msg.getOrElse(0)))
.cache

g2.vertices.count

val g3 = g.outerJoinVertices(g.degrees)(
  (vid, old, msg) = msg.getOrElse(0))
.mapVertices((vid, d) = Prog.processInt(d))
.cache

g3.vertices.count

'g2.vertices.toDebugString' outputs;

(2) VertexRDDImpl[16] at RDD at VertexRDD.scala:57 []
 |  VertexRDD ZippedPartitionsRDD2[15] at zipPartitions at
VertexRDDImpl.scala:121 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:330 []
|  GraphLoader.edgeListFile - edges (../temp/graph.txt), EdgeRDD,
EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at Graph...


'g3.vertices.toDebugString' outputs;

(2) VertexRDDImpl[33] at RDD at VertexRDD.scala:57 []
 |  VertexRDD MapPartitionsRDD[32] at mapPartitions at
VertexRDDImpl.scala:96 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD ZippedPartitionsRDD2[24] at zipPartitions at
VertexRDDImpl.scala:121 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPar...

-- maropu

On Mon, Feb 9, 2015 at 5:47 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 I changed the

 curGraph = curGraph.outerJoinVertices(curMessages)(
   (vid, vertex, message) =
 vertex.process(message.getOrElse(List[Message]()), ti)
 ).cache()

 to

 curGraph = curGraph.outerJoinVertices(curMessages)(
   (vid, vertex, message) = (vertex,
 message.getOrElse(List[Message]()))
 ).mapVertices( (x,y) = y._1.process( y._2, ti ) ).cache()

 So the call to the 'process' method was moved out of the outerJoinVertices
 and into a separate mapVertices call, and the problem went away. Now,
 'process' is only called once during the correct cycle.
 So it would appear that outerJoinVertices caches the closure to be
 recalculated if needed again while mapVertices actually caches the
 derived values.

 Is this a bug or a feature?

 Kyle



 On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:

 I'm trying to setup a simple iterative message/update problem in GraphX
 (spark 1.2.0), but I'm running into issues with the caching and
 re-calculation of data. I'm trying to follow the example found in the
 Pregel implementation of materializing and cacheing messages and graphs and
 then unpersisting them after the next cycle has been done.
 It doesn't seem to be working, because every cycle gets progressively
 slower and it seems as if more and more of the values are being
 re-calculated despite my attempts to cache them.

 The code:
 ```
   var oldMessages : VertexRDD[List[Message]] = null
   var oldGraph : Graph[MyVertex, MyEdge ] = null
   curGraph = curGraph.mapVertices((x, y) = y.init())
   for (i - 0 to cycle_count) {
 val curMessages = curGraph.aggregateMessages[List[Message]](x = {
   //send messages
   .
 },
 (x, y) = {
//collect messages into lists
 val out = x ++ y
 out
   }
 ).cache()
 curMessages.count()
 val ti = i
 oldGraph = curGraph
 curGraph = curGraph.outerJoinVertices(curMessages)(
   (vid, vertex, message) =
 vertex.process(message.getOrElse(List[Message]()), ti)
 ).cache()
 curGraph.vertices.count()
 oldGraph.unpersistVertices(blocking = false)
 oldGraph.edges.unpersist(blocking = false)
 oldGraph = curGraph
 if (oldMessages != null ) {
   oldMessages.unpersist(blocking=false)
 }
 oldMessages = curMessages
   }
 ```

 The MyVertex.process method takes the list of incoming messages, averages
 them and returns a new MyVertex object. I've also set it up to 

Re: Shuffle write increases in spark 1.2

2015-02-15 Thread Ami Khandeshi
I have seen same behavior!  I would love to hear an update on this...

Thanks,

Ami

On Thu, Feb 5, 2015 at 8:26 AM, Anubhav Srivastav 
anubhav.srivas...@gmail.com wrote:

 Hi Kevin,
 We seem to be facing the same problem as well. Were you able to find
 anything after that? The ticket does not seem to have progressed anywhere.

 Regards,
 Anubhav

 On 5 January 2015 at 10:37, 정재부 itsjb.j...@samsung.com wrote:

  Sure, here is a ticket. https://issues.apache.org/jira/browse/SPARK-5081



 --- *Original Message* ---

 *Sender* : Josh Rosenrosenvi...@gmail.com

 *Date* : 2015-01-05 06:14 (GMT+09:00)

 *Title* : Re: Shuffle write increases in spark 1.2


 If you have a small reproduction for this issue, can you open a ticket at
 https://issues.apache.org/jira/browse/SPARK ?



 On December 29, 2014 at 7:10:02 PM, Kevin Jung (itsjb.j...@samsung.com)
 wrote:

  Hi all,
 The size of shuffle write showing in spark web UI is mush different when
 I
 execute same spark job on same input data(100GB) in both spark 1.1 and
 spark
 1.2.
 At the same sortBy stage, the size of shuffle write is 39.7GB in spark
 1.1
 but 91.0GB in spark 1.2.
 I set spark.shuffle.manager option to hash because it's default value is
 changed but spark 1.2 writes larger file than spark 1.1.
 Can anyone tell me why this happened?

 Thanks
 Kevin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

 - To
 unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org





Array in broadcast can't be serialized

2015-02-15 Thread Tao Xiao
I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized
even when I registered both of them in Kryo.

The code is as follows:

   val conf = new SparkConf()
.setAppName(Hello Spark)
.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.kryo.registrator, xt.MyKryoRegistrator)

val sc = new SparkContext(conf)

val rdd = sc.parallelize(List(
(new ImmutableBytesWritable(Bytes.toBytes(AAA)), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes(BBB)), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes(CCC)), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes(DDD)), new
KeyValue())), 4)

// snippet 1:  a single object of *ImmutableBytesWritable* can be
serialized in broadcast
val partitioner = new SingleElementPartitioner(sc.broadcast(new
ImmutableBytesWritable(Bytes.toBytes(3
val ret = rdd.aggregateByKey(List[KeyValue](),
partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist()
println(\n\n\ret.count =  + ret.count + ,  partition size =  +
ret.partitions.size)

// snippet 2: an array of *ImmutableBytesWritable* can not be
serialized in broadcast
val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
ImmutableBytesWritable(Bytes.toBytes(2)), new
ImmutableBytesWritable(Bytes.toBytes(3)))
val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
val ret1 = rdd.aggregateByKey(List[KeyValue](),
newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys )
println(\n\n\nrdd2.count =  + ret1.count)

sc.stop


  // the following are kryo registrator and partitioners
   class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
 kryo.register(classOf[ImmutableBytesWritable])   //
register ImmutableBytesWritable
 kryo.register(classOf[Array[ImmutableBytesWritable]])
 // register
Array[ImmutableBytesWritable]
}
   }

   class SingleElementPartitioner(bc:
Broadcast[ImmutableBytesWritable]) extends Partitioner {
override def numPartitions: Int = 5
def v = Bytes.toInt(bc.value.get)
override def getPartition(key: Any): Int =  v - 1
   }


class ArrayPartitioner(bc:
Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
val arr = bc.value
override def numPartitions: Int = arr.length
override def getPartition(key: Any): Int =
Bytes.toInt(arr(0).get)
}



In the code above, snippet 1 can work as expected. But snippet 2 throws
Task not serializable: java.io.NotSerializableException:
org.apache.hadoop.hbase.io.ImmutableBytesWritable  .


So do I have to implement a Kryo serializer for Array[T] if it is used in
broadcast ?

Thanks


Specifying AMI when using Spark EC-2 scripts

2015-02-15 Thread olegshirokikh
Hi there,

Is there a way to specify the AWS AMI with particular OS (say Ubuntu) when
launching Spark on Amazon cloud with provided scripts?

What is the default AMI, operating system that is launched by EC-2 script?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-AMI-when-using-Spark-EC-2-scripts-tp21658.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Extract hour from Timestamp in Spark SQL

2015-02-15 Thread Cheng, Hao
Are you using the SQLContext? I think the HiveContext is recommended.

Cheng Hao

From: Wush Wu [mailto:w...@bridgewell.com]
Sent: Thursday, February 12, 2015 2:24 PM
To: u...@spark.incubator.apache.org
Subject: Extract hour from Timestamp in Spark SQL

Dear all,

I am new to Spark SQL and have no experience of Hive.
I tried to use the built-in Hive Function to extract the hour from timestamp in 
spark sql, but got : java.util.NoSuchElementException: key not found: hour
How should I extract the hour from timestamp?
And I am very confusing about which functions I could use in Spark SQL. Is 
there any list of available functions except  
http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#compatibility-with-apache-hive
 ?
Thanks,
Wush




monit with spark

2015-02-15 Thread Mike Sam
We want to monitor spark master and spark slaves using monit but we want to
use the sbin scripts to do so. The scripts create the spark master and
salve processes independent from themselves so monit would not know the
started processed pid to watch. Is this correct? Should we watch the ports?

How should we configure monit to run and monitor spark standalone
processes?

-- 
Thanks,
Mike


Loading JSON dataset with Spark Mllib

2015-02-15 Thread pankaj channe
Hi,

I am new to spark and planning on writing a machine learning application
with Spark mllib. My dataset is in json format. Is it possible to load data
into spark without using any external json libraries? I have explored the
option of SparkSql but I believe that is only for interactive use or
loading data into hive tables.

Thanks,
Pankaj


Re: spark-local dir running out of space during long ALS run

2015-02-15 Thread Antony Mayi
spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 
   

Re: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0

2015-02-15 Thread matroyd
It works now using 1.2.1. Thanks for all the help. Spark rocks !!



-
Thanks,
Roy
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-HiveContext-created-SchemaRDD-s-saveAsTable-is-not-working-on-1-2-0-tp21442p21664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to HDFS from spark Streaming

2015-02-15 Thread Bahubali Jain
I used the latest assembly jar and the below as suggested by Akhil to fix
this problem...
temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class,
*(Class)* TextOutputFormat.class);

Thanks All for the help !

On Wed, Feb 11, 2015 at 1:38 PM, Sean Owen so...@cloudera.com wrote:

 That kinda dodges the problem by ignoring generic types. But it may be
 simpler than the 'real' solution, which is a bit ugly.

 (But first, to double check, are you importing the correct
 TextOutputFormat? there are two versions. You use .mapred. with the
 old API and .mapreduce. with the new API.)

 Here's how I've formally casted around it in similar code:

 @SuppressWarnings
 Class? extends OutputFormat?,? outputFormatClass =
 (Class? extends OutputFormat?,?) (Class?) TextOutputFormat.class;

 and then pass that as the final argument.

 On Wed, Feb 11, 2015 at 6:35 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
  Did you try :
 
  temp.saveAsHadoopFiles(DailyCSV,.txt, String.class,
 String.class,(Class)
  TextOutputFormat.class);
 
  Thanks
  Best Regards
 
  On Wed, Feb 11, 2015 at 9:40 AM, Bahubali Jain bahub...@gmail.com
 wrote:
 
  Hi,
  I am facing issues while writing data from a streaming rdd to hdfs..
 
  JavaPairDstreamString,String temp;
  ...
  ...
  temp.saveAsHadoopFiles(DailyCSV,.txt, String.class,
  String.class,TextOutputFormat.class);
 
 
  I see compilation issues as below...
  The method saveAsHadoopFiles(String, String, Class?, Class?, Class?
  extends OutputFormat?,?) in the type JavaPairDStreamString,String
 is
  not applicable for the arguments (String, String, ClassString,
  ClassString, ClassTextOutputFormat)
 
  I see same kind of problem even with saveAsNewAPIHadoopFiles API .
 
  Thanks,
  Baahu
 
 




-- 
Twitter:http://twitter.com/Baahu


WARN from Similarity Calculation

2015-02-15 Thread Debasish Das
Hi,

I am sometimes getting WARN from running Similarity calculation:

15/02/15 23:07:55 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(7, abc.com, 48419, 0) with no recent heart beats: 66435ms
exceeds 45000ms

Do I need to increase the default 45 s to larger values for cases where we
are doing blocked operation or long compute in the mapPartitions ?

Thanks.
Deb