How to avoid the delay associated with Hive Metastore when loading parquet?
Hi, I'm loading parquet files via spark, and I see the first time a file is loaded that there is a 5-10s delay related to the Hive Metastore with messages relating to metastore in the console. How can I avoid this delay and keep the metadata around? I want the data to be persisted even after killing the JVM/sparksession and avoid this delay. I have configured hive-site to use a MySQL DB as the metastore - i thought that would solve the problem by giving it a persistent metastore, but that did not help, so I don't quite understand whats going on. How do i keep the metadata around and avoid the delay? Here is the relevant code and config *Initializing the SparkSession, storing and reading data via parquet* *hive-site.xml* *Console output* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-the-delay-associated-with-Hive-Metastore-when-loading-parquet-tp27948.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Partition parquet data by ENUM column
Hi, I am using a custom build of spark 1.4 with the parquet dependency upgraded to 1.7. I have thrift data encoded with parquet that i want to partition by a column of type ENUM. Spark programming guide says partition discovery is only supported for string and numeric columns, so it seems partition discovery won't work out of the box here. Is there any workaround that will allow me to partition by ENUMs? Will hive partitioning help here? I am unfamiliar with Hive, and how it plays into parquet, thrift and spark so I would appreciate any pointers in the right direction. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Limit # of parallel parquet decompresses
My jobs frequently run out of memory if the #of cores on an executor is too high, because each core launches a new parquet decompressor thread, which allocates memory off heap to decompress. Consequently, even with say 12 cores on an executor, depending on the memory, I can only use 2-3 to avoid OOMs when reading parquet files. Ideally I would want to use all 12 cores, but limit the # of parquet decompresses to 2-3 per executor. Is there some way to do this? Thanks, Ankit -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limit-of-parallel-parquet-decompresses-tp22022.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why are task results large in this case?
I am running a job, part of which is to add some null values to the rows of a SchemaRDD. The job fails with Total size of serialized results of 2692 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize(1024.0 MB) This is the code: val in = sqc.parquetFile(...) .. val presentColProj: SchemaRDD = in.select(symbolList : _*) val nullSeq:Broadcast[Seq[_]] = sc.broadcast(Seq.fill(missingColNames.size)(null)) val nullPaddedProj: RDD[Row] = presentColProj.map { row = Row.fromSeq( Row.unapplySeq(row).get ++ nullSeq.value) } .. sqc.applySchema(nullPaddedProj, newSchema) I believe it is failing on the map. Is the size of the serialized result large because of the rows in the map? Is there a better way to add some null columns to a schemardd? Any insight would be appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-are-task-results-large-in-this-case-tp21503.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Serialized task result size exceeded
This is on spark 1.2 I am loading ~6k parquet files, roughly 500 MB each into a schemaRDD, and calling count() on it. After loading about 2705 tasks (there is one per file), the job crashes with this error: Total size of serialized results of 2705 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) This indicates that the results of each task are about 2705/1024 = 2.6MB each. Is that normal? I don't know exactly what the result of each task would be, but 2.6 MB for each seems too high. Can anyone offer an explanation as to what the normal size should be if this is too high, or ways to reduce this? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialized-task-result-size-exceeded-tp21449.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
StackOverflowError with SchemaRDD
Hi, I am getting a stack overflow error when querying a schemardd comprised of parquet files. This is (part of) the stack trace: Caused by: java.lang.StackOverflowError at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:89) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:60) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10.applyOrElse(Analyzer.scala:218) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10.applyOrElse(Analyzer.scala:216) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
Re: saveAsTextFile
I have seen this happen when the RDD contains null values. Essentially, saveAsTextFile calls toString() on the elements of the RDD, so a call to null.toString will result in an NPE. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p21178.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark.cleaner questions
I am using spark 1.1 with the ooyala job server (which basically creates long running spark jobs as contexts to execute jobs in). These contexts have cached RDDs in memory (via RDD.persist()). I want to enable the spark.cleaner to cleanup the /spark/work directories that are created for each app, but not touch cached RDDs like so: spark.worker.cleanup.enabled = true spark.worker.cleanup.interval = 1800 spark.worker.cleanup.appDataTtl = 604800 #7 days 2 questions here - Will these settings affect cleanup of cached RDDs? (because I want those to be persisted forever) Is there a way to force the cleaner to run, and how can I see when the cleaner is run? After settings these options, I still see the data for apps older than 7 days on the worker nodes. Why is that happening? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-cleaner-questions-tp21128.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDDs being cleaned too fast
I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too fast. How can i inspect the size of RDD in memory and get more information about why it was cleaned up. There should be more than enough memory available on the cluster to store them, and by default, the spark.cleaner.ttl is infinite, so I want more information about why this is happening and how to prevent it. Spark just logs this when removing RDDs: [2014-12-11 01:19:34,006] INFO spark.storage.BlockManager [] [] - Removing RDD 33 [2014-12-11 01:19:34,010] INFO pache.spark.ContextCleaner [] [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 [2014-12-11 01:19:34,012] INFO spark.storage.BlockManager [] [] - Removing RDD 33 [2014-12-11 01:19:34,016] INFO pache.spark.ContextCleaner [] [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Remove added jar from spark context
Hi, Is there a way to REMOVE a jar (added via addJar) to spark contexts? We have a long running context used by the spark jobserver, but after trying to update versions of classes already in the class path via addJars, the context still runs the old versions. It would be helpful if I could remove the old jar from the context when adding the new one to prevent running stale code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Remove-added-jar-from-spark-context-tp20121.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Imbalanced shuffle read
Adding a call to rdd.repartition() after randomizing the keys has no effect either. code - //partitioning is done like partitionIdx = f(key) % numPartitions //we use random keys to get even partitioning val uniform = other_stream.transform(rdd = { rdd.map({ kv = val k = kv._1 val v = kv._2 (UUID.randomUUID().toString, v) }) rdd.repartition(20) }) uniform.foreachRDD(rdd = { rdd.forEachPartition(partition = { -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648p18791.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Imbalanced shuffle read
I have made some progress - the partitioning is very uneven, and everything goes to one partition. I see that spark partitions by key, so I tried this: //partitioning is done like partitionIdx = f(key) % numPartitions //we use random keys to get even partitioning val uniform = other_stream.transform(rdd = { rdd.map({ kv = val k = kv._1 val v = kv._2 (UUID.randomUUID().toString, v) }) }) uniform.foreachRDD(rdd = { rdd.forEachPartition(partition = { ... As you can see, I'm using random keys. Even in this case, when running with 2 nodes, i verified that one partition is completely empty, and the other contains all the records. What is going wrong with the partitioning here? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648p18790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Imbalanced shuffle read
Im running a job that uses groupByKey(), so it generates a lot of shuffle data. Then it processes this and writes files to HDFS in a forEachPartition block. Looking at the forEachPartition stage details in the web console, all but one executor is idle (SUCCESS in 50-60ms), and one is RUNNING with a huge shuffle read and takes a long time to finish. Can someone explain why the read is all on one node and how to parallelize this better? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to trace/debug serialization?
In my spark job, I have a loop something like this: bla.forEachRdd(rdd = { //init some vars rdd.forEachPartition(partiton = { //init some vars partition.foreach(kv = { ... I am seeing serialization errors (unread block data), because I think spark is trying to serialize the whole containing class. But I have been careful not to reference instance vars in the block. Is there a way to see exactly what class is failing serialization, and maybe how spark decided it needs to be serialized? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-trace-debug-serialization-tp18230.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Creating a SchemaRDD from RDD of thrift classes
I have one job with spark that creates some RDDs of type X and persists them in memory. The type X is an auto generated Thrift java class (not a case class though). Now in another job, I want to convert the RDD to a SchemaRDD using sqlContext.applySchema(). Can I derive a schema from the thrift definitions to convert RDD[X] to SchemaRDD[X]? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-SchemaRDD-from-RDD-of-thrift-classes-tp17766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is SparkSQL + JDBC server a good approach for caching?
I want to set up spark SQL to allow ad hoc querying over the last X days of processed data, where the data is processed through spark. This would also have to cache data (in memory only), so the approach I was thinking of was to build a layer that persists the appropriate RDDs and stores them in memory. I see spark sql allows ad hoc querying through JDBC though I have never used that before. Will using JDBC offer any advantages (e.g does it have built in support for caching?) over rolling my own solution for this use case? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exceptions not caught?
I am simply catching all exceptions (like case e:Throwable = println(caught: +e) ) Here is the stack trace: 2014-10-23 15:51:10,766 ERROR [] Exception in task 1.0 in stage 1.0 (TID 1) java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, env_type:PROD,) at com.A.thrift.Y.writeObject(Y.java:8489) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, env_type:PROD, ...) at com.A.thrift.Y.validate(Y:8428) at com.A.thrift.Y$YStandardScheme.write(Y.java:9359) at com.A.thrift.Y$FlatAdserverEventStandardScheme.write(Y.java:8509) at com.A.thrift.Y.write(Y.java:7646) at com.A.thrift.Y.writeObject(Y.java:8487) ... 27 more 2014-10-23 15:51:10,766 11234 ERROR [] Exception in task 0.0 in stage 1.0 (TID 0) java.io.IOException: org.apache.thrift.protocol.TProtocolException: Required field 'X' is unset! Struct:Y(id:, ts:1409094360004, type:NON, response_time:2, now:1409094360, ...) at com.A.thrift.YwriteObject(Y.java:8489) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) at
Re: Exceptions not caught?
Also everything is running locally on my box, driver and workers. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157p17160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exceptions not caught?
Can you check your class Y and fix the above ? I can, but this is about catching the exception should it be thrown by any class in the spark job. Why is the exception not being caught? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exceptions-not-caught-tp17157p17163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org