[SPARK-SQL] Requested array size exceeds VM limit
I am trying to run a query on a month of data. The volume of data is not much, but we have a partition per hour and per day. The table schema is heavily nested with total of 300 leaf fields. I am trying to run a simple select count(*) query on the table and running into this exception: SELECT > COUNT(*) > FROM >p_all_tx > WHERE >date_prefix >= "20150500" >AND date_prefix <= "20150700" >AND sanitizeddetails.merchantaccountid = 'Rvr7StMZSTQj'; java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2003) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:73) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:70) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:70) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141) The table is a parquet table. I am not sure why the closure should exceed VM limit. Could somebody explain why this is happening. Is it because I have a lot of partitions and table scan is essentially creating one RDD per partition.
hive thriftserver and fair scheduling
Hi All, Does anyone have fair scheduling working for them in a hive server? I have one hive thriftserver running and multiple users trying to run queries at the same time on that server using a beeline client. I see that a big query is stopping all other queries from making any progress. Is this supposed to be this way? Is there anything else that I need to be doing for fair scheduling to be working for the thriftserver?
Re: hive thriftserver and fair scheduling
Thanks Michael, I'll try it out. Another quick/important question: How do I make udfs available to all of the hive thriftserver users? Right now, when I launch a spark-sql client, I notice that it reads the ~/.hiverc file and all udfs get picked up but this doesn't seem to be working in hive thriftserver. Is there a way to make it work in a similar way for all users in hive thriftserver? Thanks again! On Tue, Oct 20, 2015 at 12:34 PM, Michael Armbrust wrote: > Not the most obvious place in the docs... but this is probably helpful: > https://spark.apache.org/docs/latest/sql-programming-guide.html#scheduling > > You likely want to put each user in their own pool. > > On Tue, Oct 20, 2015 at 11:55 AM, Sadhan Sood > wrote: > >> Hi All, >> >> Does anyone have fair scheduling working for them in a hive server? I >> have one hive thriftserver running and multiple users trying to run queries >> at the same time on that server using a beeline client. I see that a big >> query is stopping all other queries from making any progress. Is this >> supposed to be this way? Is there anything else that I need to be doing for >> fair scheduling to be working for the thriftserver? >> > >
SPARK SQL- Parquet projection pushdown for nested data
I noticed when querying struct data in spark sql, we are requesting the whole column from parquet files. Is this intended or is there some kind of config to control this behaviour? Wouldn't it be better to request just the struct field?
Re: SPARK SQL- Parquet projection pushdown for nested data
Thanks Michael, I will upvote this. On Thu, Oct 29, 2015 at 10:29 AM, Michael Armbrust wrote: > Yeah, this is unfortunate. It would be good to fix this, but its a > non-trivial change. > > Tracked here if you'd like to vote on the issue: > https://issues.apache.org/jira/browse/SPARK-4502 > > On Thu, Oct 29, 2015 at 6:00 PM, Sadhan Sood > wrote: > >> I noticed when querying struct data in spark sql, we are requesting the >> whole column from parquet files. Is this intended or is there some kind of >> config to control this behaviour? Wouldn't it be better to request just the >> struct field? >> > >
Spark cluster multi tenancy
Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan
Re: Spark cluster multi tenancy
Attaching log for when the dev job gets stuck (once all its executors are lost due to preemption). This is a spark-shell job running in yarn-client mode. On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood wrote: > Hi All, > > We've set up our spark cluster on aws running on yarn (running on hadoop > 2.3) with fair scheduling and preemption turned on. The cluster is shared > for prod and dev work where prod runs with a higher fair share and can > preempt dev jobs if there are not enough resources available for it. > It appears that dev jobs which get preempted often get unstable after > losing some executors and the whole jobs gets stuck (without making any > progress) or end up getting restarted (and hence losing all the work done). > Has someone encountered this before ? Is the solution just to set > spark.task.maxFailures > to a really high value to recover from task failures in such scenarios? Are > there other approaches that people have taken for spark multi tenancy that > works better in such scenario? > > Thanks, > Sadhan > spark_job_stuck.log Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark cluster multi tenancy
Interestingly, if there is nothing running on dev spark-shell, it recovers successfully and regains the lost executors. Attaching the log for that. Notice, the "Registering block manager .." statements in the very end after all executors were lost. On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood wrote: > Attaching log for when the dev job gets stuck (once all its executors are > lost due to preemption). This is a spark-shell job running in yarn-client > mode. > > On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood > wrote: > >> Hi All, >> >> We've set up our spark cluster on aws running on yarn (running on hadoop >> 2.3) with fair scheduling and preemption turned on. The cluster is shared >> for prod and dev work where prod runs with a higher fair share and can >> preempt dev jobs if there are not enough resources available for it. >> It appears that dev jobs which get preempted often get unstable after >> losing some executors and the whole jobs gets stuck (without making any >> progress) or end up getting restarted (and hence losing all the work done). >> Has someone encountered this before ? Is the solution just to set >> spark.task.maxFailures >> to a really high value to recover from task failures in such scenarios? Are >> there other approaches that people have taken for spark multi tenancy that >> works better in such scenario? >> >> Thanks, >> Sadhan >> > > spark_job_recovers.log Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK-SQL parameter tuning for performance
Hi Spark users, We are running Spark on Yarn and often query table partitions as big as 100~200 GB from hdfs. Hdfs is co-located on the same cluster on which Spark and Yarn run. I've noticed a much higher I/O read rates when I increase the number of executors cores from 2 to 8( Most tasks run in RACK_LOCAL and few in NODE_LOCAL) while keeping the #executors constant. The ram on my executor is around 24G. But the problem is that any subsequent shuffle stage starts failing if I do that. It runs fine if i leave the number of executors to 2 but then the read is much slower. The errors which I get from Yarn is org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output > location for shuffle 1 > at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) It seem to indicate Yarn is killing executors for using too much memory but I can't be sure. I tried increasing the spark sql shuffle partitions as well but that didn't help much. Is there a way we can run more partition read tasks per executor but keep the #shuffle tasks still constant. Thanks
Re: Error when cache partitioned Parquet table
Hi Xu-dong, Thats probably because your table's partition path don't look like hdfs://somepath/key=value/*.parquet. Spark is trying to extract the partition key's value from the path while caching and hence the exception is being thrown since it can't find one. On Mon, Jan 26, 2015 at 10:45 AM, ZHENG, Xu-dong wrote: > Hi all, > > I meet below error when I cache a partitioned Parquet table. It seems > that, Spark is trying to extract the partitioned key in the Parquet file, > so it is not found. But other query could run successfully, even request > the partitioned key. Is it a bug in SparkSQL? Is there any workaround for > it? Thank you! > > java.util.NoSuchElementException: key not found: querydate > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142) > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:142) > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:127) > at > org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:247) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > > -- > 郑旭东 > ZHENG, Xu-dong > >
Fwd: how to find the sources for spark-project
-- Forwarded message -- From: Sadhan Sood Date: Sat, Oct 11, 2014 at 10:26 AM Subject: Re: how to find the sources for spark-project To: Stephen Boesch Thanks, I still didn't find it - is it under some particular branch ? More specifically, I am looking to modify the file: ParquetHiveSerDe.java which is under this namespace: org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java and is being linked through this jar: org.spark-project.hive hive-exec 0.12.0 It seems these are modified versions of the same jar in org.apache.hive: org.apache.hive hive-exec 0.13.1 It'd be great if I can find the sources of this jar so I can modify it locally and rebuild jar. Not sure if I can rebuild spark with hive-exec artifact in org.apache.hive without breaking it. On Fri, Oct 10, 2014 at 7:28 PM, Stephen Boesch wrote: > Git clone the spark projecthttps://github.com/apache/spark.git > the hive related sources are under sql/hive/src/main/scala > > 2014-10-10 16:24 GMT-07:00 sadhan : > > We have our own customization on top of parquet serde that we've been using >> for hive. In order to make it work with spark-sql, we need to be able to >> re-build spark with this. It'll be much easier to rebuild spark with this >> patch once I can find the sources for org.spark-project.hive. Not sure >> where >> to find it ? This seems like the place where we need to put our patch. >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-find-the-sources-for-spark-project-tp16187.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
persist table schema in spark-sql
We want to persist table schema of parquet file so as to use spark-sql cli on that table later on? Is it possible or is spark-sql cli only good for tables in hive metastore ? We are reading parquet data using this example: // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.// The result of loading a Parquet file is also a SchemaRDD.val parquetFile = sqlContext.parquetFile("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements.parquetFile.registerTempTable("parquetFile")
read all parquet files in a directory in spark-sql
How can we read all parquet files in a directory in spark-sql. We are following this example which shows a way to read one file: // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.// The result of loading a Parquet file is also a SchemaRDD.val parquetFile = sqlContext.parquetFile("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements.parquetFile.registerTempTable("parquetFile")
Re: read all parquet files in a directory in spark-sql
Thanks Nick, DB - that was helpful. On Mon, Oct 13, 2014 at 5:44 PM, DB Tsai wrote: > For now, with SparkSPARK-3462 parquet pushdown for unionAll PR, you > can do the following for unionAll schemaRDD. > > val files = Array("hdfs://file1.parquet", "hdfs://file2.parquet", > "hdfs://file3.parquet") > val rdds = paths.map(hc.parquetFile(_)) > > val unionedRDD = { > var temp = rdds(0) > for (i <- 1 until rdds.length) { > temp = temp.unionAll(rdds(i)) > } > temp > } > > Sincerely, > > DB Tsai > --- > My Blog: https://www.dbtsai.com > LinkedIn: https://www.linkedin.com/in/dbtsai > > > On Mon, Oct 13, 2014 at 7:22 PM, Nicholas Chammas > wrote: > > Right now I believe the only supported option is to pass a > comma-delimited > > list of paths. > > > > I've opened SPARK-3928: Support wildcard matches on Parquet files to > request > > this feature. > > > > Nick > > > > On Mon, Oct 13, 2014 at 12:21 PM, Sadhan Sood > wrote: > >> > >> How can we read all parquet files in a directory in spark-sql. We are > >> following this example which shows a way to read one file: > >> > >> // Read in the parquet file created above. Parquet files are > >> self-describing so the schema is preserved. > >> // The result of loading a Parquet file is also a SchemaRDD. > >> val parquetFile = sqlContext.parquetFile("people.parquet") > >> > >> //Parquet files can also be registered as tables and then used in SQL > >> statements. > >> parquetFile.registerTempTable("parquetFile") > > > > >
Sharing spark context across multiple spark sql cli initializations
We want to run multiple instances of spark sql cli on our yarn cluster. Each instance of the cli is to be used by a different user. This looks non-optimal if each user brings up a different cli given how spark works on yarn by running executor processes (and hence consuming resources) on worker nodes for the lifetime of the application. So, the right way seems like to use the same spark context shared across multiple initializations and running just one spark sql application. Is the understanding correct ? Is there a way to do it currently ? Seem like it needs some kind of thrift interface hooked into the cli driver.
Re: Sharing spark context across multiple spark sql cli initializations
Thanks Michael, you saved me a lot of time! On Wed, Oct 22, 2014 at 6:04 PM, Michael Armbrust wrote: > The JDBC server is what you are looking for: > http://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbc-server > > On Wed, Oct 22, 2014 at 11:10 AM, Sadhan Sood > wrote: > >> We want to run multiple instances of spark sql cli on our yarn cluster. >> Each instance of the cli is to be used by a different user. This looks >> non-optimal if each user brings up a different cli given how spark works on >> yarn by running executor processes (and hence consuming resources) on >> worker nodes for the lifetime of the application. So, the right way seems >> like to use the same spark context shared across multiple initializations >> and running just one spark sql application. Is the understanding correct ? >> Is there a way to do it currently ? Seem like it needs some kind of thrift >> interface hooked into the cli driver. >> > >
Job cancelled because SparkContext was shut down - failures!
Hi, Trying to run a query on spark-sql but it keeps failing with this error on the cli ( we are running spark-sql on a yarn cluster): org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:700) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1405) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1352) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) On the UI, I see there are connection failures in mapPartition stage: java.net.SocketTimeoutException: Read timed out java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.read(SocketInputStream.java:150) java.net.SocketInputStream.read(SocketInputStream.java:121) java.io.BufferedInputStream.fill(BufferedInputStream.java:246) java.io.BufferedInputStream.read1(BufferedInputStream.java:286) java.io.BufferedInputStream.read(BufferedInputStream.java:345) sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703) sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534) sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439) org.apache.spark.util.Utils$.fetchFile(Utils.scala:362) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:331) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:329) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) scala.collection.mutable.HashMap.foreach(HashMap.scala:98) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:329) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
Re: Job cancelled because SparkContext was shut down - failures!
These seem like s3 connection errors for the table data. Wondering, since we don't see that many failures on hive. I also set the spark.task.maxFailures = 15. On Fri, Oct 24, 2014 at 12:15 PM, Sadhan Sood wrote: > Hi, > > Trying to run a query on spark-sql but it keeps failing with this error on > the cli ( we are running spark-sql on a yarn cluster): > > > org.apache.spark.SparkException: Job cancelled because SparkContext was > shut down > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:700) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) > at > org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:699) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1405) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1352) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > On the UI, I see there are connection failures in mapPartition stage: > > java.net.SocketTimeoutException: Read timed out > java.net.SocketInputStream.socketRead0(Native Method) > java.net.SocketInputStream.read(SocketInputStream.java:150) > java.net.SocketInputStream.read(SocketInputStream.java:121) > java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > java.io.BufferedInputStream.read(BufferedInputStream.java:345) > sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703) > sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) > > sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534) > > sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439) > org.apache.spark.util.Utils$.fetchFile(Utils.scala:362) > > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:331) > > org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:329) > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:329) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > >
Re: Is SparkSQL + JDBC server a good approach for caching?
Is there a way to cache certain (or most latest) partitions of certain tables ? On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust wrote: > It does have support for caching using either CACHE TABLE or > CACHE TABLE AS SELECT > > On Fri, Oct 24, 2014 at 1:05 AM, ankits wrote: > >> I want to set up spark SQL to allow ad hoc querying over the last X days >> of >> processed data, where the data is processed through spark. This would also >> have to cache data (in memory only), so the approach I was thinking of was >> to build a layer that persists the appropriate RDDs and stores them in >> memory. >> >> I see spark sql allows ad hoc querying through JDBC though I have never >> used >> that before. Will using JDBC offer any advantages (e.g does it have built >> in >> support for caching?) over rolling my own solution for this use case? >> >> Thanks! >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Is SparkSQL + JDBC server a good approach for caching?
That works perfect. Thanks again Michael On Fri, Oct 24, 2014 at 3:10 PM, Michael Armbrust wrote: > It won't be transparent, but you can do so something like: > > CACHE TABLE newData AS SELECT * FROM allData WHERE date > "..." > > and then query newData. > > On Fri, Oct 24, 2014 at 12:06 PM, Sadhan Sood > wrote: > >> Is there a way to cache certain (or most latest) partitions of certain >> tables ? >> >> On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust > > wrote: >> >>> It does have support for caching using either CACHE TABLE or >>> CACHE TABLE AS SELECT >>> >>> On Fri, Oct 24, 2014 at 1:05 AM, ankits wrote: >>> >>>> I want to set up spark SQL to allow ad hoc querying over the last X >>>> days of >>>> processed data, where the data is processed through spark. This would >>>> also >>>> have to cache data (in memory only), so the approach I was thinking of >>>> was >>>> to build a layer that persists the appropriate RDDs and stores them in >>>> memory. >>>> >>>> I see spark sql allows ad hoc querying through JDBC though I have never >>>> used >>>> that before. Will using JDBC offer any advantages (e.g does it have >>>> built in >>>> support for caching?) over rolling my own solution for this use case? >>>> >>>> Thanks! >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >
thrift jdbc server probably running queries as hive query
I was testing out the spark thrift jdbc server by running a simple query in the beeline client. The spark itself is running on a yarn cluster. However, when I run a query in beeline -> I see no running jobs in the spark UI(completely empty) and the yarn UI seem to indicate that the submitted query is being run as a map reduce job. This is probably also being indicated from the spark logs but I am not completely sure: 2014-11-11 00:19:00,492 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 2014-11-11 00:19:00,877 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,152 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,425 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication is deprecated. Instead, use mapreduce.client.submit.file.replication 2014-11-11 00:19:04,516 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,607 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this 2014-11-11 00:00:08,806 INFO input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 14912 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader (GPLNativeCodeLoader.java:(34)) - Loaded native gpl library 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:(76)) - Successfully loaded & initialized native-lzo library [hadoop-lzo rev 8e266e052e423af592871e2dfe09d54c03f6a0e8] 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 194541317 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:615 2014-11-11 00:00:10,095 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1414084656759_0115 2014-11-11 00:00:10,241 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(167)) - Submitted application application_1414084656759_0115 It seems like the query is being run as a hive query instead of spark query. The same query works fine when run from spark-sql cli.
Re: thrift jdbc server probably running queries as hive query
Hi Cheng, I made sure the only hive server running on the machine is hivethriftserver2. /usr/lib/jvm/default-java/bin/java -cp /usr/lib/hadoop/lib/hadoop-lzo.jar::/mnt/sadhan/spark-3/sbin/../conf:/mnt/sadhan/spark-3/spark-assembly-1.2.0-SNAPSHOT-hadoop2.3.0-cdh5.0.2.jar:/etc/hadoop/conf -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master yarn --jars reporting.jar spark-internal The query I am running is a simple count(*): "select count(*) from Xyz where date_prefix=20141031" and pretty sure it's submitting a map reduce job based on the spark logs: TakesRest=false Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= 14/11/11 16:23:17 INFO ql.Context: New scratch dir is hdfs://fdsfdsfsdfsdf:9000/tmp/hive-ubuntu/hive_2014-11-11_16-23-17_333_5669798325805509526-2 Starting Job = job_1414084656759_0142, Tracking URL = http://xxx:8100/proxy/application_1414084656759_0142/ <http://t.signauxdix.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XYg2zGvG-W8rBGxP1p8d-TW64zBkx56dS1Dd58vwq02?t=http%3A%2F%2Fec2-54-83-34-89.compute-1.amazonaws.com%3A8100%2Fproxy%2Fapplication_1414084656759_0142%2F&si=6222577584832512&pi=626685a9-b628-43cc-91a1-93636171ce77> Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1414084656759_0142 On Mon, Nov 10, 2014 at 9:59 PM, Cheng Lian wrote: > Hey Sadhan, > > I really don't think this is Spark log... Unlike Shark, Spark SQL doesn't > even provide a Hive mode to let you execute queries against Hive. Would you > please check whether there is an existing HiveServer2 running there? Spark > SQL HiveThriftServer2 is just a Spark port of HiveServer2, and they share > the same default listening port. I guess the Thrift server didn't start > successfully because the HiveServer2 occupied the port, and your Beeline > session was probably linked against HiveServer2. > > Cheng > > > On 11/11/14 8:29 AM, Sadhan Sood wrote: > > I was testing out the spark thrift jdbc server by running a simple query > in the beeline client. The spark itself is running on a yarn cluster. > > However, when I run a query in beeline -> I see no running jobs in the > spark UI(completely empty) and the yarn UI seem to indicate that the > submitted query is being run as a map reduce job. This is probably also > being indicated from the spark logs but I am not completely sure: > > 2014-11-11 00:19:00,492 INFO ql.Context > (Context.java:getMRScratchDir(267)) - New scratch dir is > hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 > > 2014-11-11 00:19:00,877 INFO ql.Context > (Context.java:getMRScratchDir(267)) - New scratch dir is > hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 > > 2014-11-11 00:19:04,152 INFO ql.Context > (Context.java:getMRScratchDir(267)) - New scratch dir is > hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 > > 2014-11-11 00:19:04,425 INFO Configuration.deprecation > (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication > is deprecated. Instead, use mapreduce.client.submit.file.replication > > 2014-11-11 00:19:04,516 INFO client.RMProxy > (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager > at :8032 > > 2014-11-11 00:19:04,607 INFO client.RMProxy > (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager > at :8032 > > 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter > (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option > parsing not performed. Implement the Tool interface and execute your > application with ToolRunner to remedy this > > 2014-11-11 00:00:08,806 INFO input.FileInputFormat > (FileInputFormat.java:listStatus(287)) - Total input paths to process : > 14912 > > 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader > (GPLNativeCodeLoader.java:(34)) - Loaded native gpl library > > 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:(76)) - > Successfully loaded & initialized native-lzo library [hadoop-lzo rev > 8e266e052e423af592871e2dfe09d54c03f6a0e8] > > 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat > (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node > allocation with : CompletedNodes: 1, size left: 194541317 > > 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter > (JobSubmitt
Too many failed collects when trying to cache a table in SparkSQL
We are running spark on yarn with combined memory > 1TB and when trying to cache a table partition(which is < 100G), seeing a lot of failed collect stages in the UI and this never succeeds. Because of the failed collect, it seems like the mapPartitions keep getting resubmitted. We have more than enough memory so its surprising we are seeing this issue. Can someone please help. Thanks! The stack trace of the failed collect from UI is: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Too many failed collects when trying to cache a table in SparkSQL
oadcast at DAGScheduler.scala:838 2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) finished in 66.475 s 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - looking for newly runnable stages 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - running: Set() 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - failed: Set() 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84), which is now runnable 2014-11-12 19:11:15,482 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at DAGScheduler.scala:838 2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84) 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler (Logging.scala:logError(75)) - Lost executor 372 on ip-10-95-163-84.ec2.internal: remote Akka client disassociated 2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Asked to remove non-existent executor 372 2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3) On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood wrote: > We are running spark on yarn with combined memory > 1TB and when trying to > cache a table partition(which is < 100G), seeing a lot of failed collect > stages in the UI and this never succeeds. Because of the failed collect, it > seems like the mapPartitions keep getting resubmitted. We have more than > enough memory so its surprising we are seeing this issue. Can someone > please help. Thanks! > > The stack trace of the failed collect from UI is: > > org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output > location for shuffle 0 > at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) > at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > at > org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) > at > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTa
Re: Building spark targz
Just making sure but are you looking for the tar in assembly/target dir ? On Wed, Nov 12, 2014 at 3:14 PM, Ashwin Shankar wrote: > Hi, > I just cloned spark from the github and I'm trying to build to generate a > tar ball. > I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive > -DskipTests clean package > > Although the build is successful, I don't see the targz generated. > > Am I running the wrong command ? > > -- > Thanks, > Ashwin > > >
Re: Building spark targz
I think you can provide -Pbigtop-dist to build the tar. On Wed, Nov 12, 2014 at 3:21 PM, Sean Owen wrote: > mvn package doesn't make tarballs. It creates artifacts that will > generally appear in target/ and subdirectories, and likewise within > modules. Look at make-distribution.sh > > On Wed, Nov 12, 2014 at 8:14 PM, Ashwin Shankar > wrote: > >> Hi, >> I just cloned spark from the github and I'm trying to build to generate a >> tar ball. >> I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive >> -DskipTests clean package >> >> Although the build is successful, I don't see the targz generated. >> >> Am I running the wrong command ? >> >> -- >> Thanks, >> Ashwin >> >> >> >
Re: Too many failed collects when trying to cache a table in SparkSQL
On re running the cache statement, from the logs I see that when collect(stage 1) fails it always leads to mapPartition(stage 0) for one partition to be re-run. This can be seen from the collect log as well on the container log: rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 The data is lzo compressed sequence file with compressed size ~ 26G. Is there a way to understand why shuffle keeps failing for one partition. I believe we have enough memory to store the uncompressed data in memory. On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood wrote: > This is the log output: > > 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation > (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS > SELECT * FROM xyz where date_prefix = 20141112' > > 2014-11-12 19:07:17,455 INFO Configuration.deprecation > (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is > deprecated. Instead, use mapreduce.job.maps > > 2014-11-12 19:07:17,756 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at > TableReader.scala:68 > > 2014-11-12 19:07:18,292 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84 > > 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat > (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200 > > 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at > Exchange.scala:86) > > 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84) > with 1 output partitions (allowLocal=false) > > 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at > SparkPlan.scala:84) > > 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0) > > 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0) > > 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at > mapPartitions at Exchange.scala:86), which has no missing parents > > 2014-11-12 19:07:22,916 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at > DAGScheduler.scala:838 > > 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0 > (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) > > 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) > finished in 161.113 s > > 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - looking for newly runnable stages > > 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - running: Set() > > 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) > > 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - failed: Set() > > 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() > > 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at > SparkPlan.scala:84), which is now runnable > > 2014-11-12 19:10:04,112 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at > DAGScheduler.scala:838 > > 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 > (MappedRDD[16] at map at SparkPlan.scala:84) > > 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler > (Logging.scala:logError(75)) - Lost executor 52 on > ip-10-61-175-167.ec2.internal: remote Akka client disassociated > > 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor > (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system > [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has > failed, address is now gated for [5000] ms. Reason is: [Disassociated]. > > 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend > (Logging.scala:logError(75)) - Asked to remove non-existent executor 52 > > 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1) > > 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.sca
Cache sparkSql data without uncompressing it in memory
We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Re: Cache sparkSql data without uncompressing it in memory
Thanks Chneg, Just one more question - does that mean that we still need enough memory in the cluster to uncompress the data before it can be compressed again or does that just read the raw data as is? On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian wrote: > Currently there’s no way to cache the compressed sequence file directly. > Spark SQL uses in-memory columnar format while caching table rows, so we > must read all the raw data and convert them into columnar format. However, > you can enable in-memory columnar compression by setting > spark.sql.inMemoryColumnarStorage.compressed to true. This property is > already set to true by default in master branch and branch-1.2. > > On 11/13/14 7:16 AM, Sadhan Sood wrote: > > We noticed while caching data from our hive tables which contain data > in compressed sequence file format that it gets uncompressed in memory when > getting cached. Is there a way to turn this off and cache the compressed > data as is ? > > >
SparkSQL exception on cached parquet table
While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable("xyz_20141109") sqlContext.sql("SELECT count(*) FROM xyz_20141109").collect() <-- works fine sqlContext.cacheTable("xyz_20141109") sqlContext.sql("SELECT count(*) FROM xyz_20141109").collect() <-- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException
Re: Cache sparkSql data without uncompressing it in memory
Thanks Cheng, that was helpful. I noticed from UI that only half of the memory per executor was being used for caching, is that true? We have a 2 TB sequence file dataset that we wanted to cache in our cluster with ~ 5TB memory but caching still failed and what looked like from the UI was that it used 2.5 TB of memory and almost wrote 12 TB to disk (at which point it was useless) during the mapPartition stage. Also, couldn't run more than 2 executors/box (60g memory/box) or else it died very quickly from lesser memory/executor (not sure why?) although I/O seemed to be going much faster which makes sense because of more parallel reads. On Thu, Nov 13, 2014 at 10:50 PM, Cheng Lian wrote: > No, the columnar buffer is built in a small batching manner, the batch > size is controlled by the spark.sql.inMemoryColumnarStorage.batchSize > property. The default value for this in master and branch-1.2 is 10,000 > rows per batch. > > On 11/14/14 1:27 AM, Sadhan Sood wrote: > > Thanks Chneg, Just one more question - does that mean that we still > need enough memory in the cluster to uncompress the data before it can be > compressed again or does that just read the raw data as is? > > On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian > wrote: > >> Currently there’s no way to cache the compressed sequence file >> directly. Spark SQL uses in-memory columnar format while caching table >> rows, so we must read all the raw data and convert them into columnar >> format. However, you can enable in-memory columnar compression by setting >> spark.sql.inMemoryColumnarStorage.compressed to true. This property is >> already set to true by default in master branch and branch-1.2. >> >> On 11/13/14 7:16 AM, Sadhan Sood wrote: >> >> We noticed while caching data from our hive tables which contain data in >> compressed sequence file format that it gets uncompressed in memory when >> getting cached. Is there a way to turn this off and cache the compressed >> data as is ? >> >> >> > > >
Re: SparkSQL exception on cached parquet table
Hi Cheng, I tried reading the parquet file(on which we were getting the exception) through parquet-tools and it is able to dump the file and I can read the metadata, etc. I also loaded the file through hive table and can run a table scan query on it as well. Let me know if I can do more to help resolve the problem, I'll run it through a debugger and see if I can get more information on it in the meantime. Thanks, Sadhan On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian wrote: > (Forgot to cc user mail list) > > > On 11/16/14 4:59 PM, Cheng Lian wrote: > > Hey Sadhan, > > Thanks for the additional information, this is helpful. Seems that some > Parquet internal contract was broken, but I'm not sure whether it's caused > by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged > somehow. I'm investigating this. In the meanwhile, would you mind to help > to narrow down the problem by trying to scan exactly the same Parquet file > with some other systems (e.g. Hive or Impala)? If other systems work, then > there must be something wrong with Spark SQL. > > Cheng > > On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood > wrote: > >> Hi Cheng, >> >> Thanks for your response. Here is the stack trace from yarn logs: >> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >> at java.util.ArrayList.elementData(ArrayList.java:418) >> at java.util.ArrayList.get(ArrayList.java:431) >> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) >> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) >> at >> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282) >> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) >> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) >> at >> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) >> at >> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) >> at >> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) >> at >> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) >> ... 26 more >> >> >> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian >> wrote: >> >>> Hi Sadhan, >>> >>> Could you please provide the stack trace of the >>> ArrayIndexOutOfBoundsException (if any)? The reason why the first query >>> succeeds is that Spark SQL doesn’t bother reading all data from the table >>> to give COUNT(*). In the second case, however, the whole table is asked >>> to be cached lazily via the cacheTable call, thus it’s scanned to build >>> the in-memory columnar cache. Then thing went wrong while scanning this LZO >>> compressed Parquet file. But unfortunately the stack trace at hand doesn’t >>> indicate the root cause. >>> >>> Cheng >>> >>> On 11/15/14 5:28 AM, Sadhan Sood wrote: >>> >>> While testing SparkSQL on a bunch of parquet files (basically used to be >>> a partition for one of our hive tables), I encountered this error: >>> >>> import org.apache.spark.sql.SchemaRDD >>> import org.apache.hadoop.fs.FileSystem; >>> import org.apache.hadoop.conf.Configuration; >>> import org.apache.hadoop.fs.Path; >>> >>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>> >>> val parquetFileRDD = sqlContext.parquetFile(parquetFile) >>> parquetFileRDD.registerTempTable("xyz_20141109") >>> sqlContext.sql("SELECT count(*) FROM xyz_20141109").collect() <-- works >>> fine >>> sqlContext.cacheTable("xyz_20141109") >>> sqlContext.sql("SELECT count(*) FROM xyz_20141109").collect() <-- fails >>> with an exception >>> >>> parquet.io.ParquetDecodingException: Can not read value at 0 in block >>> -1 in file >>> hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet >>> >>> at >>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) >>> >>> at >>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) >>> >>&g
Exception in spark sql when running a group by query
While testing sparkSQL, we were running this group by with expression query and got an exception. The same query worked fine on hive. SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') 14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') ] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression not in GROUP BY: HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179, tree: Aggregate [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)], [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L] MetastoreRelation default, all_matched_abc, None at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:425) at org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:59) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:276) at org.apache.hadoop.hive.cli.CliDriver.pro
Re: Exception in spark sql when running a group by query
ah makes sense - Thanks Michael! On Mon, Nov 17, 2014 at 6:08 PM, Michael Armbrust wrote: > You are perhaps hitting an issue that was fixed by #3248 > <https://github.com/apache/spark/pull/3248>? > > On Mon, Nov 17, 2014 at 9:58 AM, Sadhan Sood > wrote: > >> While testing sparkSQL, we were running this group by with expression >> query and got an exception. The same query worked fine on hive. >> >> SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), >> '/MM/dd') as pst_date, >> count(*) as num_xyzs >> FROM >> all_matched_abc >> GROUP BY >> from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), >> '/MM/dd') >> >> 14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT >> from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), >> '/MM/dd') as pst_date, >> count(*) as num_xyzs >> FROM >> all_matched_abc >> GROUP BY >> from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), >> '/MM/dd') >> ] >> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression >> not in GROUP BY: >> HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived >> AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, >> DoubleType))),/MM/dd) AS pst_date#179, tree: >> >> Aggregate >> [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived, >> DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)], >> [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived >> AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, >> DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L] >> >> MetastoreRelation default, all_matched_abc, None >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) >> at >> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >> at >> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) >> at >> scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) >> at >> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) >> at >> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) >> at >> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) &
Re: SparkSQL exception on cached parquet table
s the file dump using parquet-tools: row group 0 col_a: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]... col_b: BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]... col_c: BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]... col_d: .map: ..key: BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]... ..value: ...value_tuple: BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]... col_a TV=9 RL=0 DL=1 page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9 col_b TV=9 RL=0 DL=1 page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9 col_c TV=9 RL=0 DL=1 page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9 col_d.map.key TV=9 RL=1 DL=2 page 0: DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9 col_d.map.value.value_tuple TV=9 RL=2 DL=4 page 0: DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9 BINARY col_a *** row group 1 of 1, values 1 to 9 *** value 1: R:1 D:1 V:a1 value 2: R:1 D:1 V:a2 value 3: R:1 D:1 V:a3 value 4: R:1 D:1 V:a4 value 5: R:1 D:1 V:a5 value 6: R:1 D:1 V:a6 value 7: R:1 D:1 V:a7 value 8: R:1 D:1 V:a8 value 9: R:1 D:1 V:a9 BINARY col_b *** row group 1 of 1, values 1 to 9 *** value 1: R:1 D:1 V:b1 value 2: R:1 D:1 V:b2 value 3: R:1 D:1 V:b3 value 4: R:1 D:1 V:b4 value 5: R:1 D:1 V:b5 value 6: R:1 D:1 V:b6 value 7: R:1 D:1 V:b7 value 8: R:1 D:1 V:b8 value 9: R:1 D:1 V:b9 BINARY col_c *** row group 1 of 1, values 1 to 9 *** value 1: R:1 D:1 V:c1 value 2: R:1 D:1 V:c2 value 3: R:1 D:1 V:c3 value 4: R:1 D:1 V:c4 value 5: R:1 D:1 V:c5 value 6: R:1 D:1 V:c6 value 7: R:1 D:1 V:c7 value 8: R:1 D:1 V:c8 value 9: R:1 D:1 V:c9 BINARY col_d.map.key *** row group 1 of 1, values 1 to 9 *** value 1: R:0 D:0 V: value 2: R:0 D:0 V: value 3: R:0 D:0 V: value 4: R:0 D:0 V: value 5: R:0 D:0 V: value 6: R:0 D:0 V: value 7: R:0 D:0 V: value 8: R:0 D:0 V: value 9: R:0 D:0 V: BINARY col_d.map.value.value_tuple *** row group 1 of 1, values 1 to 9 *** value 1: R:0 D:0 V: value 2: R:0 D:0 V: value 3: R:0 D:0 V: value 4: R:0 D:0 V: value 5: R:0 D:0 V: value 6: R:0 D:0 V: value 7: R:0 D:0 V: value 8: R:0 D:0 V: value 9: R:0 D:0 V: I am happy to provide more information but any help is appreciated. On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood wrote: > Hi Cheng, > > I tried reading the parquet file(on which we were getting the exception) > through parquet-tools and it is able to dump the file and I can read the > metadata, etc. I also loaded the file through hive table and can run a > table scan query on it as well. Let me know if I can do more to help > resolve the problem, I'll run it through a debugger and see if I can get > more information on it in the meantime. > > Thanks, > Sadhan > > On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian wrote: > >> (Forgot to cc user mail list) >> >> >> On 11/16/14 4:59 PM, Cheng Lian wrote: >> >> Hey Sadhan, >> >> Thanks for the additional information, this is helpful. Seems that some >> Parquet internal contract was broken, but I'm not sure whether it's caused >> by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged >> somehow. I'm investigating this. In the meanwhile, would you mind to help >> to narrow down the problem by trying to scan exactly the same Parquet file >> with some other systems (e.g. Hive or Impala)? If other systems work, then >> there must be something wrong with Spark SQL. >> >> Cheng >> >> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood >> wrote: >> >>> Hi Cheng, >>> >>> Thanks for your response. Here is the stack trace from yarn logs: >>> >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >>> at java.util.ArrayList.elementData(ArrayList.java:418) >>> at java.util.ArrayList.get(ArrayList.java:431) >>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) >>>
Re: SparkSQL exception on cached parquet table
Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood wrote: > So, I am seeing this issue with spark sql throwing an exception when > trying to read selective columns from a thrift parquet file and also when > caching them: > On some further digging, I was able to narrow it down to at-least one > particular column type: map> to be causing this issue. > To reproduce this I created a test thrift file with a very basic schema and > stored some sample data in a parquet file: > > Test.thrift > === > typedef binary SomeId > > enum SomeExclusionCause { > WHITELIST = 1, > HAS_PURCHASE = 2, > } > > struct SampleThriftObject { > 10: string col_a; > 20: string col_b; > 30: string col_c; > 40: optional map> col_d; > } > = > > And loading the data in spark through schemaRDD: > > import org.apache.spark.sql.SchemaRDD > val sqlContext = new org.apache.spark.sql.SQLContext(sc); > val parquetFile = "/path/to/generated/parquet/file" > val parquetFileRDD = sqlContext.parquetFile(parquetFile) > parquetFileRDD.printSchema > root > |-- col_a: string (nullable = true) > |-- col_b: string (nullable = true) > |-- col_c: string (nullable = true) > |-- col_d: map (nullable = true) > ||-- key: string > ||-- value: array (valueContainsNull = true) > |||-- element: string (containsNull = false) > > parquetFileRDD.registerTempTable("test") > sqlContext.cacheTable("test") > sqlContext.sql("select col_a from test").collect() <-- see the exception > stack here > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read > value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet > at > parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) > at > parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) > at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) > at > org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) > at > org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(ArrayList.java:418) > at java.util.ArrayList.get(ArrayList.java:431) > at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) > at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) > at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) > at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) > at > parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282) > at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) > at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) > at > parquet.filter2.compat.FilterCompat$NoOpFil
Adding partitions to parquet data
We are loading parquet data as temp tables but wondering if there is a way to add a partition to the data without going through hive (we still want to use spark's parquet serde as compared to hive). The data looks like -> /date1/file1, /date1/file2 ... , /date2/file1, /date2/file2,/daten/filem and we are loading it like: val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file names) but it would be nice to able to add a partition and provide date in the query parameter.
Re: SparkSQL exception on cached parquet table
I am running on master, pulled yesterday I believe but saw the same issue with 1.2.0 On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust wrote: > Which version are you running on again? > > On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood > wrote: > >> Also attaching the parquet file if anyone wants to take a further look. >> >> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood >> wrote: >> >>> So, I am seeing this issue with spark sql throwing an exception when >>> trying to read selective columns from a thrift parquet file and also when >>> caching them: >>> On some further digging, I was able to narrow it down to at-least one >>> particular column type: map> to be causing this issue. >>> To reproduce this I created a test thrift file with a very basic schema and >>> stored some sample data in a parquet file: >>> >>> Test.thrift >>> === >>> typedef binary SomeId >>> >>> enum SomeExclusionCause { >>> WHITELIST = 1, >>> HAS_PURCHASE = 2, >>> } >>> >>> struct SampleThriftObject { >>> 10: string col_a; >>> 20: string col_b; >>> 30: string col_c; >>> 40: optional map> col_d; >>> } >>> = >>> >>> And loading the data in spark through schemaRDD: >>> >>> import org.apache.spark.sql.SchemaRDD >>> val sqlContext = new org.apache.spark.sql.SQLContext(sc); >>> val parquetFile = "/path/to/generated/parquet/file" >>> val parquetFileRDD = sqlContext.parquetFile(parquetFile) >>> parquetFileRDD.printSchema >>> root >>> |-- col_a: string (nullable = true) >>> |-- col_b: string (nullable = true) >>> |-- col_c: string (nullable = true) >>> |-- col_d: map (nullable = true) >>> ||-- key: string >>> ||-- value: array (valueContainsNull = true) >>> |||-- element: string (containsNull = false) >>> >>> parquetFileRDD.registerTempTable("test") >>> sqlContext.cacheTable("test") >>> sqlContext.sql("select col_a from test").collect() <-- see the exception >>> stack here >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage >>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read >>> value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet >>> at >>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) >>> at >>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) >>> at >>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at >>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>> at >>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>> at >>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) >>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>> at org.apache.spark.execut
Re: Adding partitions to parquet data
Ah awesome, thanks!! On Thu, Nov 20, 2014 at 3:01 PM, Michael Armbrust wrote: > In 1.2 by default we use Spark parquet support instead of Hive when the > SerDe contains the word "Parquet". This should work with hive partitioning. > > On Thu, Nov 20, 2014 at 10:33 AM, Sadhan Sood > wrote: > >> We are loading parquet data as temp tables but wondering if there is a >> way to add a partition to the data without going through hive (we still >> want to use spark's parquet serde as compared to hive). The data looks like >> -> >> >> /date1/file1, /date1/file2 ... , /date2/file1, >> /date2/file2,/daten/filem >> >> and we are loading it like: >> val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file >> names) >> >> but it would be nice to able to add a partition and provide date in the >> query parameter. >> > >
Re: SparkSQL exception on cached parquet table
Thanks Michael, opened this https://issues.apache.org/jira/browse/SPARK-4520 On Thu, Nov 20, 2014 at 2:59 PM, Michael Armbrust wrote: > Can you open a JIRA? > > On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood > wrote: > >> I am running on master, pulled yesterday I believe but saw the same issue >> with 1.2.0 >> >> On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust > > wrote: >> >>> Which version are you running on again? >>> >>> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood >>> wrote: >>> >>>> Also attaching the parquet file if anyone wants to take a further look. >>>> >>>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood >>>> wrote: >>>> >>>>> So, I am seeing this issue with spark sql throwing an exception when >>>>> trying to read selective columns from a thrift parquet file and also when >>>>> caching them: >>>>> On some further digging, I was able to narrow it down to at-least one >>>>> particular column type: map> to be causing this issue. >>>>> To reproduce this I created a test thrift file with a very basic schema >>>>> and >>>>> stored some sample data in a parquet file: >>>>> >>>>> Test.thrift >>>>> === >>>>> typedef binary SomeId >>>>> >>>>> enum SomeExclusionCause { >>>>> WHITELIST = 1, >>>>> HAS_PURCHASE = 2, >>>>> } >>>>> >>>>> struct SampleThriftObject { >>>>> 10: string col_a; >>>>> 20: string col_b; >>>>> 30: string col_c; >>>>> 40: optional map> col_d; >>>>> } >>>>> = >>>>> >>>>> And loading the data in spark through schemaRDD: >>>>> >>>>> import org.apache.spark.sql.SchemaRDD >>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc); >>>>> val parquetFile = "/path/to/generated/parquet/file" >>>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile) >>>>> parquetFileRDD.printSchema >>>>> root >>>>> |-- col_a: string (nullable = true) >>>>> |-- col_b: string (nullable = true) >>>>> |-- col_c: string (nullable = true) >>>>> |-- col_d: map (nullable = true) >>>>> ||-- key: string >>>>> ||-- value: array (valueContainsNull = true) >>>>> |||-- element: string (containsNull = false) >>>>> >>>>> parquetFileRDD.registerTempTable("test") >>>>> sqlContext.cacheTable("test") >>>>> sqlContext.sql("select col_a from test").collect() <-- see the >>>>> exception stack here >>>>> >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in >>>>> stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not >>>>> read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet >>>>> at >>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) >>>>> at >>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) >>>>> at >>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) >>>>> at >>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>> at >>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>> at scala.collection.TraversableOnce$class.to >>>>> (TraversableOnce.scala:273) >>>>> at
SparkSQL - can we add new column(s) to parquet files
We create the table definition by reading the parquet file for schema and store it in hive metastore. But if someone adds a new column to the schema, and if we rescan the schema from the new parquet files and update the table definition, would it still work if we run queries on the table ? So, old table has -> Int a, Int b new table -> Int a, Int b, String c but older parquet files don't have String c, so on querying the table would it return me null for column c from older files and data from newer files or fail?
does spark sql support columnar compression with encoding when caching tables
Hi All, Wondering if when caching a table backed by lzo compressed parquet data, if spark also compresses it (using lzo/gzip/snappy) along with column level encoding or just does the column level encoding when "*spark.sql.inMemoryColumnarStorage.compressed" *is set to true. This is because when I try to cache the data, I notice the memory being used is almost as much as the uncompressed size of the data. Thanks!
Re: does spark sql support columnar compression with encoding when caching tables
Hey Michael, Thank you for clarifying that. Is tachyon the right way to get compressed data in memory or should we explore the option of adding compression to cached data. This is because our uncompressed data set is too big to fit in memory right now. I see the benefit of tachyon not just with storing compressed data in memory but we wouldn't have to create a separate table for caching some partitions like 'cache table table_cached as select * from table where date = 201412XX' - the way we are doing right now. On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust wrote: > > There is only column level encoding (run length encoding, delta encoding, > dictionary encoding) and no generic compression. > > On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood > wrote: >> >> Hi All, >> >> Wondering if when caching a table backed by lzo compressed parquet data, >> if spark also compresses it (using lzo/gzip/snappy) along with column level >> encoding or just does the column level encoding when >> "*spark.sql.inMemoryColumnarStorage.compressed" >> *is set to true. This is because when I try to cache the data, I notice >> the memory being used is almost as much as the uncompressed size of the >> data. >> >> Thanks! >> >
Re: does spark sql support columnar compression with encoding when caching tables
Thanks Michael, that makes sense. On Fri, Dec 19, 2014 at 3:13 PM, Michael Armbrust wrote: > Yeah, tachyon does sound like a good option here. Especially if you have > nested data, its likely that parquet in tachyon will always be better > supported. > > On Fri, Dec 19, 2014 at 2:17 PM, Sadhan Sood > wrote: >> >> Hey Michael, >> >> Thank you for clarifying that. Is tachyon the right way to get compressed >> data in memory or should we explore the option of adding compression to >> cached data. This is because our uncompressed data set is too big to fit in >> memory right now. I see the benefit of tachyon not just with storing >> compressed data in memory but we wouldn't have to create a separate table >> for caching some partitions like 'cache table table_cached as select * from >> table where date = 201412XX' - the way we are doing right now. >> >> >> On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust > > wrote: >>> >>> There is only column level encoding (run length encoding, delta >>> encoding, dictionary encoding) and no generic compression. >>> >>> On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood >>> wrote: >>>> >>>> Hi All, >>>> >>>> Wondering if when caching a table backed by lzo compressed parquet >>>> data, if spark also compresses it (using lzo/gzip/snappy) along with column >>>> level encoding or just does the column level encoding when >>>> "*spark.sql.inMemoryColumnarStorage.compressed" >>>> *is set to true. This is because when I try to cache the data, I >>>> notice the memory being used is almost as much as the uncompressed size of >>>> the data. >>>> >>>> Thanks! >>>> >>>
Re: does spark sql support columnar compression with encoding when caching tables
Thanks Cheng, Michael - that was super helpful. On Sun, Dec 21, 2014 at 7:27 AM, Cheng Lian wrote: > Would like to add that compression schemes built in in-memory columnar > storage only supports primitive columns (int, string, etc.), complex types > like array, map and struct are not supported. > > > On 12/20/14 6:17 AM, Sadhan Sood wrote: > > Hey Michael, > > Thank you for clarifying that. Is tachyon the right way to get compressed > data in memory or should we explore the option of adding compression to > cached data. This is because our uncompressed data set is too big to fit in > memory right now. I see the benefit of tachyon not just with storing > compressed data in memory but we wouldn't have to create a separate table > for caching some partitions like 'cache table table_cached as select * from > table where date = 201412XX' - the way we are doing right now. > > > On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust > wrote: >> >> There is only column level encoding (run length encoding, delta encoding, >> dictionary encoding) and no generic compression. >> >> On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood >> wrote: >>> >>> Hi All, >>> >>> Wondering if when caching a table backed by lzo compressed parquet >>> data, if spark also compresses it (using lzo/gzip/snappy) along with column >>> level encoding or just does the column level encoding when >>> "*spark.sql.inMemoryColumnarStorage.compressed" >>> *is set to true. This is because when I try to cache the data, I notice >>> the memory being used is almost as much as the uncompressed size of the >>> data. >>> >>> Thanks! >>> >> >