SPARK SQL- Parquet projection pushdown for nested data
I noticed when querying struct data in spark sql, we are requesting the whole column from parquet files. Is this intended or is there some kind of config to control this behaviour? Wouldn't it be better to request just the struct field?
Re: SPARK SQL- Parquet projection pushdown for nested data
Thanks Michael, I will upvote this. On Thu, Oct 29, 2015 at 10:29 AM, Michael Armbrust <mich...@databricks.com> wrote: > Yeah, this is unfortunate. It would be good to fix this, but its a > non-trivial change. > > Tracked here if you'd like to vote on the issue: > https://issues.apache.org/jira/browse/SPARK-4502 > > On Thu, Oct 29, 2015 at 6:00 PM, Sadhan Sood <sadhan.s...@gmail.com> > wrote: > >> I noticed when querying struct data in spark sql, we are requesting the >> whole column from parquet files. Is this intended or is there some kind of >> config to control this behaviour? Wouldn't it be better to request just the >> struct field? >> > >
hive thriftserver and fair scheduling
Hi All, Does anyone have fair scheduling working for them in a hive server? I have one hive thriftserver running and multiple users trying to run queries at the same time on that server using a beeline client. I see that a big query is stopping all other queries from making any progress. Is this supposed to be this way? Is there anything else that I need to be doing for fair scheduling to be working for the thriftserver?
Re: hive thriftserver and fair scheduling
Thanks Michael, I'll try it out. Another quick/important question: How do I make udfs available to all of the hive thriftserver users? Right now, when I launch a spark-sql client, I notice that it reads the ~/.hiverc file and all udfs get picked up but this doesn't seem to be working in hive thriftserver. Is there a way to make it work in a similar way for all users in hive thriftserver? Thanks again! On Tue, Oct 20, 2015 at 12:34 PM, Michael Armbrust <mich...@databricks.com> wrote: > Not the most obvious place in the docs... but this is probably helpful: > https://spark.apache.org/docs/latest/sql-programming-guide.html#scheduling > > You likely want to put each user in their own pool. > > On Tue, Oct 20, 2015 at 11:55 AM, Sadhan Sood <sadhan.s...@gmail.com> > wrote: > >> Hi All, >> >> Does anyone have fair scheduling working for them in a hive server? I >> have one hive thriftserver running and multiple users trying to run queries >> at the same time on that server using a beeline client. I see that a big >> query is stopping all other queries from making any progress. Is this >> supposed to be this way? Is there anything else that I need to be doing for >> fair scheduling to be working for the thriftserver? >> > >
[SPARK-SQL] Requested array size exceeds VM limit
I am trying to run a query on a month of data. The volume of data is not much, but we have a partition per hour and per day. The table schema is heavily nested with total of 300 leaf fields. I am trying to run a simple select count(*) query on the table and running into this exception: SELECT > COUNT(*) > FROM >p_all_tx > WHERE >date_prefix >= "20150500" >AND date_prefix <= "20150700" >AND sanitizeddetails.merchantaccountid = 'Rvr7StMZSTQj'; java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2003) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:73) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:70) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:70) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141) The table is a parquet table. I am not sure why the closure should exceed VM limit. Could somebody explain why this is happening. Is it because I have a lot of partitions and table scan is essentially creating one RDD per partition.
SPARK-SQL parameter tuning for performance
Hi Spark users, We are running Spark on Yarn and often query table partitions as big as 100~200 GB from hdfs. Hdfs is co-located on the same cluster on which Spark and Yarn run. I've noticed a much higher I/O read rates when I increase the number of executors cores from 2 to 8( Most tasks run in RACK_LOCAL and few in NODE_LOCAL) while keeping the #executors constant. The ram on my executor is around 24G. But the problem is that any subsequent shuffle stage starts failing if I do that. It runs fine if i leave the number of executors to 2 but then the read is much slower. The errors which I get from Yarn is org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output > location for shuffle 1 > at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) It seem to indicate Yarn is killing executors for using too much memory but I can't be sure. I tried increasing the spark sql shuffle partitions as well but that didn't help much. Is there a way we can run more partition read tasks per executor but keep the #shuffle tasks still constant. Thanks
Re: Spark cluster multi tenancy
Interestingly, if there is nothing running on dev spark-shell, it recovers successfully and regains the lost executors. Attaching the log for that. Notice, the Registering block manager .. statements in the very end after all executors were lost. On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Attaching log for when the dev job gets stuck (once all its executors are lost due to preemption). This is a spark-shell job running in yarn-client mode. On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan spark_job_recovers.log Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark cluster multi tenancy
Attaching log for when the dev job gets stuck (once all its executors are lost due to preemption). This is a spark-shell job running in yarn-client mode. On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan spark_job_stuck.log Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark cluster multi tenancy
Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan
Re: Error when cache partitioned Parquet table
Hi Xu-dong, Thats probably because your table's partition path don't look like hdfs://somepath/key=value/*.parquet. Spark is trying to extract the partition key's value from the path while caching and hence the exception is being thrown since it can't find one. On Mon, Jan 26, 2015 at 10:45 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, I meet below error when I cache a partitioned Parquet table. It seems that, Spark is trying to extract the partitioned key in the Parquet file, so it is not found. But other query could run successfully, even request the partitioned key. Is it a bug in SparkSQL? Is there any workaround for it? Thank you! java.util.NoSuchElementException: key not found: querydate at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:142) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:127) at org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:247) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) -- 郑旭东 ZHENG, Xu-dong
Re: does spark sql support columnar compression with encoding when caching tables
Thanks Cheng, Michael - that was super helpful. On Sun, Dec 21, 2014 at 7:27 AM, Cheng Lian lian.cs@gmail.com wrote: Would like to add that compression schemes built in in-memory columnar storage only supports primitive columns (int, string, etc.), complex types like array, map and struct are not supported. On 12/20/14 6:17 AM, Sadhan Sood wrote: Hey Michael, Thank you for clarifying that. Is tachyon the right way to get compressed data in memory or should we explore the option of adding compression to cached data. This is because our uncompressed data set is too big to fit in memory right now. I see the benefit of tachyon not just with storing compressed data in memory but we wouldn't have to create a separate table for caching some partitions like 'cache table table_cached as select * from table where date = 201412XX' - the way we are doing right now. On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust mich...@databricks.com wrote: There is only column level encoding (run length encoding, delta encoding, dictionary encoding) and no generic compression. On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, Wondering if when caching a table backed by lzo compressed parquet data, if spark also compresses it (using lzo/gzip/snappy) along with column level encoding or just does the column level encoding when *spark.sql.inMemoryColumnarStorage.compressed *is set to true. This is because when I try to cache the data, I notice the memory being used is almost as much as the uncompressed size of the data. Thanks!
Re: does spark sql support columnar compression with encoding when caching tables
Hey Michael, Thank you for clarifying that. Is tachyon the right way to get compressed data in memory or should we explore the option of adding compression to cached data. This is because our uncompressed data set is too big to fit in memory right now. I see the benefit of tachyon not just with storing compressed data in memory but we wouldn't have to create a separate table for caching some partitions like 'cache table table_cached as select * from table where date = 201412XX' - the way we are doing right now. On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust mich...@databricks.com wrote: There is only column level encoding (run length encoding, delta encoding, dictionary encoding) and no generic compression. On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, Wondering if when caching a table backed by lzo compressed parquet data, if spark also compresses it (using lzo/gzip/snappy) along with column level encoding or just does the column level encoding when *spark.sql.inMemoryColumnarStorage.compressed *is set to true. This is because when I try to cache the data, I notice the memory being used is almost as much as the uncompressed size of the data. Thanks!
Re: does spark sql support columnar compression with encoding when caching tables
Thanks Michael, that makes sense. On Fri, Dec 19, 2014 at 3:13 PM, Michael Armbrust mich...@databricks.com wrote: Yeah, tachyon does sound like a good option here. Especially if you have nested data, its likely that parquet in tachyon will always be better supported. On Fri, Dec 19, 2014 at 2:17 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hey Michael, Thank you for clarifying that. Is tachyon the right way to get compressed data in memory or should we explore the option of adding compression to cached data. This is because our uncompressed data set is too big to fit in memory right now. I see the benefit of tachyon not just with storing compressed data in memory but we wouldn't have to create a separate table for caching some partitions like 'cache table table_cached as select * from table where date = 201412XX' - the way we are doing right now. On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust mich...@databricks.com wrote: There is only column level encoding (run length encoding, delta encoding, dictionary encoding) and no generic compression. On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, Wondering if when caching a table backed by lzo compressed parquet data, if spark also compresses it (using lzo/gzip/snappy) along with column level encoding or just does the column level encoding when *spark.sql.inMemoryColumnarStorage.compressed *is set to true. This is because when I try to cache the data, I notice the memory being used is almost as much as the uncompressed size of the data. Thanks!
does spark sql support columnar compression with encoding when caching tables
Hi All, Wondering if when caching a table backed by lzo compressed parquet data, if spark also compresses it (using lzo/gzip/snappy) along with column level encoding or just does the column level encoding when *spark.sql.inMemoryColumnarStorage.compressed *is set to true. This is because when I try to cache the data, I notice the memory being used is almost as much as the uncompressed size of the data. Thanks!
Re: SparkSQL exception on cached parquet table
Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote: So, I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them: On some further digging, I was able to narrow it down to at-least one particular column type: mapstring, setstring to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file: Test.thrift === typedef binary SomeId enum SomeExclusionCause { WHITELIST = 1, HAS_PURCHASE = 2, } struct SampleThriftObject { 10: string col_a; 20: string col_b; 30: string col_c; 40: optional mapSomeExclusionCause, setSomeId col_d; } = And loading the data in spark through schemaRDD: import org.apache.spark.sql.SchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc); val parquetFile = /path/to/generated/parquet/file val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.printSchema root |-- col_a: string (nullable = true) |-- col_b: string (nullable = true) |-- col_c: string (nullable = true) |-- col_d: map (nullable = true) ||-- key: string ||-- value: array (valueContainsNull = true) |||-- element: string (containsNull = false) parquetFileRDD.registerTempTable(test) sqlContext.cacheTable(test) sqlContext.sql(select col_a from test).collect() -- see the exception stack here org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 27 more If you take out the col_d from the thrift file, the problem goes away. The problem
Adding partitions to parquet data
We are loading parquet data as temp tables but wondering if there is a way to add a partition to the data without going through hive (we still want to use spark's parquet serde as compared to hive). The data looks like - /date1/file1, /date1/file2 ... , /date2/file1, /date2/file2,/daten/filem and we are loading it like: val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file names) but it would be nice to able to add a partition and provide date in the query parameter.
Re: SparkSQL exception on cached parquet table
I am running on master, pulled yesterday I believe but saw the same issue with 1.2.0 On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust mich...@databricks.com wrote: Which version are you running on again? On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote: So, I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them: On some further digging, I was able to narrow it down to at-least one particular column type: mapstring, setstring to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file: Test.thrift === typedef binary SomeId enum SomeExclusionCause { WHITELIST = 1, HAS_PURCHASE = 2, } struct SampleThriftObject { 10: string col_a; 20: string col_b; 30: string col_c; 40: optional mapSomeExclusionCause, setSomeId col_d; } = And loading the data in spark through schemaRDD: import org.apache.spark.sql.SchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc); val parquetFile = /path/to/generated/parquet/file val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.printSchema root |-- col_a: string (nullable = true) |-- col_b: string (nullable = true) |-- col_c: string (nullable = true) |-- col_d: map (nullable = true) ||-- key: string ||-- value: array (valueContainsNull = true) |||-- element: string (containsNull = false) parquetFileRDD.registerTempTable(test) sqlContext.cacheTable(test) sqlContext.sql(select col_a from test).collect() -- see the exception stack here org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96
Re: Adding partitions to parquet data
Ah awesome, thanks!! On Thu, Nov 20, 2014 at 3:01 PM, Michael Armbrust mich...@databricks.com wrote: In 1.2 by default we use Spark parquet support instead of Hive when the SerDe contains the word Parquet. This should work with hive partitioning. On Thu, Nov 20, 2014 at 10:33 AM, Sadhan Sood sadhan.s...@gmail.com wrote: We are loading parquet data as temp tables but wondering if there is a way to add a partition to the data without going through hive (we still want to use spark's parquet serde as compared to hive). The data looks like - /date1/file1, /date1/file2 ... , /date2/file1, /date2/file2,/daten/filem and we are loading it like: val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file names) but it would be nice to able to add a partition and provide date in the query parameter.
Re: SparkSQL exception on cached parquet table
Thanks Michael, opened this https://issues.apache.org/jira/browse/SPARK-4520 On Thu, Nov 20, 2014 at 2:59 PM, Michael Armbrust mich...@databricks.com wrote: Can you open a JIRA? On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood sadhan.s...@gmail.com wrote: I am running on master, pulled yesterday I believe but saw the same issue with 1.2.0 On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust mich...@databricks.com wrote: Which version are you running on again? On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Also attaching the parquet file if anyone wants to take a further look. On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote: So, I am seeing this issue with spark sql throwing an exception when trying to read selective columns from a thrift parquet file and also when caching them: On some further digging, I was able to narrow it down to at-least one particular column type: mapstring, setstring to be causing this issue. To reproduce this I created a test thrift file with a very basic schema and stored some sample data in a parquet file: Test.thrift === typedef binary SomeId enum SomeExclusionCause { WHITELIST = 1, HAS_PURCHASE = 2, } struct SampleThriftObject { 10: string col_a; 20: string col_b; 30: string col_c; 40: optional mapSomeExclusionCause, setSomeId col_d; } = And loading the data in spark through schemaRDD: import org.apache.spark.sql.SchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc); val parquetFile = /path/to/generated/parquet/file val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.printSchema root |-- col_a: string (nullable = true) |-- col_b: string (nullable = true) |-- col_c: string (nullable = true) |-- col_d: map (nullable = true) ||-- key: string ||-- value: array (valueContainsNull = true) |||-- element: string (containsNull = false) parquetFileRDD.registerTempTable(test) sqlContext.cacheTable(test) sqlContext.sql(select col_a from test).collect() -- see the exception stack here org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282
Re: Exception in spark sql when running a group by query
ah makes sense - Thanks Michael! On Mon, Nov 17, 2014 at 6:08 PM, Michael Armbrust mich...@databricks.com wrote: You are perhaps hitting an issue that was fixed by #3248 https://github.com/apache/spark/pull/3248? On Mon, Nov 17, 2014 at 9:58 AM, Sadhan Sood sadhan.s...@gmail.com wrote: While testing sparkSQL, we were running this group by with expression query and got an exception. The same query worked fine on hive. SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') 14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') ] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression not in GROUP BY: HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179, tree: Aggregate [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)], [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L] MetastoreRelation default, all_matched_abc, None at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan
Exception in spark sql when running a group by query
While testing sparkSQL, we were running this group by with expression query and got an exception. The same query worked fine on hive. SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') 14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') ] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression not in GROUP BY: HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179, tree: Aggregate [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)], [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L] MetastoreRelation default, all_matched_abc, None at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:425) at org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:59) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:276) at
Re: SparkSQL exception on cached parquet table
Hi Cheng, I tried reading the parquet file(on which we were getting the exception) through parquet-tools and it is able to dump the file and I can read the metadata, etc. I also loaded the file through hive table and can run a table scan query on it as well. Let me know if I can do more to help resolve the problem, I'll run it through a debugger and see if I can get more information on it in the meantime. Thanks, Sadhan On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian lian.cs@gmail.com wrote: (Forgot to cc user mail list) On 11/16/14 4:59 PM, Cheng Lian wrote: Hey Sadhan, Thanks for the additional information, this is helpful. Seems that some Parquet internal contract was broken, but I'm not sure whether it's caused by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged somehow. I'm investigating this. In the meanwhile, would you mind to help to narrow down the problem by trying to scan exactly the same Parquet file with some other systems (e.g. Hive or Impala)? If other systems work, then there must be something wrong with Spark SQL. Cheng On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi Cheng, Thanks for your response. Here is the stack trace from yarn logs: Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) ... 26 more On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com wrote: Hi Sadhan, Could you please provide the stack trace of the ArrayIndexOutOfBoundsException (if any)? The reason why the first query succeeds is that Spark SQL doesn’t bother reading all data from the table to give COUNT(*). In the second case, however, the whole table is asked to be cached lazily via the cacheTable call, thus it’s scanned to build the in-memory columnar cache. Then thing went wrong while scanning this LZO compressed Parquet file. But unfortunately the stack trace at hand doesn’t indicate the root cause. Cheng On 11/15/14 5:28 AM, Sadhan Sood wrote: While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35
SparkSQL exception on cached parquet table
While testing SparkSQL on a bunch of parquet files (basically used to be a partition for one of our hive tables), I encountered this error: import org.apache.spark.sql.SchemaRDD import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetFileRDD = sqlContext.parquetFile(parquetFile) parquetFileRDD.registerTempTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- works fine sqlContext.cacheTable(xyz_20141109) sqlContext.sql(SELECT count(*) FROM xyz_20141109).collect() -- fails with an exception parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.hasNext(InMemoryColumnarTableScan.scala:136) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException
Re: Cache sparkSql data without uncompressing it in memory
Thanks Cheng, that was helpful. I noticed from UI that only half of the memory per executor was being used for caching, is that true? We have a 2 TB sequence file dataset that we wanted to cache in our cluster with ~ 5TB memory but caching still failed and what looked like from the UI was that it used 2.5 TB of memory and almost wrote 12 TB to disk (at which point it was useless) during the mapPartition stage. Also, couldn't run more than 2 executors/box (60g memory/box) or else it died very quickly from lesser memory/executor (not sure why?) although I/O seemed to be going much faster which makes sense because of more parallel reads. On Thu, Nov 13, 2014 at 10:50 PM, Cheng Lian lian.cs@gmail.com wrote: No, the columnar buffer is built in a small batching manner, the batch size is controlled by the spark.sql.inMemoryColumnarStorage.batchSize property. The default value for this in master and branch-1.2 is 10,000 rows per batch. On 11/14/14 1:27 AM, Sadhan Sood wrote: Thanks Chneg, Just one more question - does that mean that we still need enough memory in the cluster to uncompress the data before it can be compressed again or does that just read the raw data as is? On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote: Currently there’s no way to cache the compressed sequence file directly. Spark SQL uses in-memory columnar format while caching table rows, so we must read all the raw data and convert them into columnar format. However, you can enable in-memory columnar compression by setting spark.sql.inMemoryColumnarStorage.compressed to true. This property is already set to true by default in master branch and branch-1.2. On 11/13/14 7:16 AM, Sadhan Sood wrote: We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Re: Cache sparkSql data without uncompressing it in memory
Thanks Chneg, Just one more question - does that mean that we still need enough memory in the cluster to uncompress the data before it can be compressed again or does that just read the raw data as is? On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote: Currently there’s no way to cache the compressed sequence file directly. Spark SQL uses in-memory columnar format while caching table rows, so we must read all the raw data and convert them into columnar format. However, you can enable in-memory columnar compression by setting spark.sql.inMemoryColumnarStorage.compressed to true. This property is already set to true by default in master branch and branch-1.2. On 11/13/14 7:16 AM, Sadhan Sood wrote: We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Too many failed collects when trying to cache a table in SparkSQL
We are running spark on yarn with combined memory 1TB and when trying to cache a table partition(which is 100G), seeing a lot of failed collect stages in the UI and this never succeeds. Because of the failed collect, it seems like the mapPartitions keep getting resubmitted. We have more than enough memory so its surprising we are seeing this issue. Can someone please help. Thanks! The stack trace of the failed collect from UI is: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Too many failed collects when trying to cache a table in SparkSQL
at DAGScheduler.scala:838 2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) finished in 66.475 s 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - looking for newly runnable stages 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - running: Set() 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - failed: Set() 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84), which is now runnable 2014-11-12 19:11:15,482 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at DAGScheduler.scala:838 2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84) 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler (Logging.scala:logError(75)) - Lost executor 372 on ip-10-95-163-84.ec2.internal: remote Akka client disassociated 2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Asked to remove non-existent executor 372 2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3) On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood sadhan.s...@gmail.com wrote: We are running spark on yarn with combined memory 1TB and when trying to cache a table partition(which is 100G), seeing a lot of failed collect stages in the UI and this never succeeds. Because of the failed collect, it seems like the mapPartitions keep getting resubmitted. We have more than enough memory so its surprising we are seeing this issue. Can someone please help. Thanks! The stack trace of the failed collect from UI is: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195
Re: Building spark targz
Just making sure but are you looking for the tar in assembly/target dir ? On Wed, Nov 12, 2014 at 3:14 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: Hi, I just cloned spark from the github and I'm trying to build to generate a tar ball. I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package Although the build is successful, I don't see the targz generated. Am I running the wrong command ? -- Thanks, Ashwin
Re: Building spark targz
I think you can provide -Pbigtop-dist to build the tar. On Wed, Nov 12, 2014 at 3:21 PM, Sean Owen so...@cloudera.com wrote: mvn package doesn't make tarballs. It creates artifacts that will generally appear in target/ and subdirectories, and likewise within modules. Look at make-distribution.sh On Wed, Nov 12, 2014 at 8:14 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: Hi, I just cloned spark from the github and I'm trying to build to generate a tar ball. I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package Although the build is successful, I don't see the targz generated. Am I running the wrong command ? -- Thanks, Ashwin
Re: Too many failed collects when trying to cache a table in SparkSQL
On re running the cache statement, from the logs I see that when collect(stage 1) fails it always leads to mapPartition(stage 0) for one partition to be re-run. This can be seen from the collect log as well on the container log: rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 The data is lzo compressed sequence file with compressed size ~ 26G. Is there a way to understand why shuffle keeps failing for one partition. I believe we have enough memory to store the uncompressed data in memory. On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood sadhan.s...@gmail.com wrote: This is the log output: 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS SELECT * FROM xyz where date_prefix = 20141112' 2014-11-12 19:07:17,455 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 2014-11-12 19:07:17,756 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at TableReader.scala:68 2014-11-12 19:07:18,292 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at Exchange.scala:86) 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84) with 1 output partitions (allowLocal=false) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at SparkPlan.scala:84) 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0) 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0) 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86), which has no missing parents 2014-11-12 19:07:22,916 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at DAGScheduler.scala:838 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) finished in 161.113 s 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - looking for newly runnable stages 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - running: Set() 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - failed: Set() 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84), which is now runnable 2014-11-12 19:10:04,112 INFO spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at DAGScheduler.scala:838 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 (MappedRDD[16] at map at SparkPlan.scala:84) 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler (Logging.scala:logError(75)) - Lost executor 52 on ip-10-61-175-167.ec2.internal: remote Akka client disassociated 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Asked to remove non-existent executor 52 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1) 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59)) - Stage 0 is now unavailable on executor 52 (460/461, false) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at SparkPlan.scala:84) as failed due to a fetch failure from Stage 0 (mapPartitions at Exchange.scala:86) 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler (Logging.scala:logInfo(59
Cache sparkSql data without uncompressing it in memory
We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Re: thrift jdbc server probably running queries as hive query
Hi Cheng, I made sure the only hive server running on the machine is hivethriftserver2. /usr/lib/jvm/default-java/bin/java -cp /usr/lib/hadoop/lib/hadoop-lzo.jar::/mnt/sadhan/spark-3/sbin/../conf:/mnt/sadhan/spark-3/spark-assembly-1.2.0-SNAPSHOT-hadoop2.3.0-cdh5.0.2.jar:/etc/hadoop/conf -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master yarn --jars reporting.jar spark-internal The query I am running is a simple count(*): select count(*) from Xyz where date_prefix=20141031 and pretty sure it's submitting a map reduce job based on the spark logs: TakesRest=false Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=number In order to limit the maximum number of reducers: set hive.exec.reducers.max=number In order to set a constant number of reducers: set mapreduce.job.reduces=number 14/11/11 16:23:17 INFO ql.Context: New scratch dir is hdfs://fdsfdsfsdfsdf:9000/tmp/hive-ubuntu/hive_2014-11-11_16-23-17_333_5669798325805509526-2 Starting Job = job_1414084656759_0142, Tracking URL = http://xxx:8100/proxy/application_1414084656759_0142/ http://t.signauxdix.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XYg2zGvG-W8rBGxP1p8d-TW64zBkx56dS1Dd58vwq02?t=http%3A%2F%2Fec2-54-83-34-89.compute-1.amazonaws.com%3A8100%2Fproxy%2Fapplication_1414084656759_0142%2Fsi=6222577584832512pi=626685a9-b628-43cc-91a1-93636171ce77 Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1414084656759_0142 On Mon, Nov 10, 2014 at 9:59 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Sadhan, I really don't think this is Spark log... Unlike Shark, Spark SQL doesn't even provide a Hive mode to let you execute queries against Hive. Would you please check whether there is an existing HiveServer2 running there? Spark SQL HiveThriftServer2 is just a Spark port of HiveServer2, and they share the same default listening port. I guess the Thrift server didn't start successfully because the HiveServer2 occupied the port, and your Beeline session was probably linked against HiveServer2. Cheng On 11/11/14 8:29 AM, Sadhan Sood wrote: I was testing out the spark thrift jdbc server by running a simple query in the beeline client. The spark itself is running on a yarn cluster. However, when I run a query in beeline - I see no running jobs in the spark UI(completely empty) and the yarn UI seem to indicate that the submitted query is being run as a map reduce job. This is probably also being indicated from the spark logs but I am not completely sure: 2014-11-11 00:19:00,492 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 2014-11-11 00:19:00,877 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,152 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,425 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication is deprecated. Instead, use mapreduce.client.submit.file.replication 2014-11-11 00:19:04,516 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,607 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this 2014-11-11 00:00:08,806 INFO input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 14912 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:clinit(76)) - Successfully loaded initialized native-lzo library [hadoop-lzo rev 8e266e052e423af592871e2dfe09d54c03f6a0e8] 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 194541317 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:615 2014-11-11 00:00:10,095 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1414084656759_0115 2014-11-11 00:00:10,241 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(167
thrift jdbc server probably running queries as hive query
I was testing out the spark thrift jdbc server by running a simple query in the beeline client. The spark itself is running on a yarn cluster. However, when I run a query in beeline - I see no running jobs in the spark UI(completely empty) and the yarn UI seem to indicate that the submitted query is being run as a map reduce job. This is probably also being indicated from the spark logs but I am not completely sure: 2014-11-11 00:19:00,492 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1 2014-11-11 00:19:00,877 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,152 INFO ql.Context (Context.java:getMRScratchDir(267)) - New scratch dir is hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2 2014-11-11 00:19:04,425 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication is deprecated. Instead, use mapreduce.client.submit.file.replication 2014-11-11 00:19:04,516 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,607 INFO client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at :8032 2014-11-11 00:19:04,639 WARN mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this 2014-11-11 00:00:08,806 INFO input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 14912 2014-11-11 00:00:08,864 INFO lzo.GPLNativeCodeLoader (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library 2014-11-11 00:00:08,866 INFO lzo.LzoCodec (LzoCodec.java:clinit(76)) - Successfully loaded initialized native-lzo library [hadoop-lzo rev 8e266e052e423af592871e2dfe09d54c03f6a0e8] 2014-11-11 00:00:09,873 INFO input.CombineFileInputFormat (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 194541317 2014-11-11 00:00:10,017 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:615 2014-11-11 00:00:10,095 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1414084656759_0115 2014-11-11 00:00:10,241 INFO impl.YarnClientImpl (YarnClientImpl.java:submitApplication(167)) - Submitted application application_1414084656759_0115 It seems like the query is being run as a hive query instead of spark query. The same query works fine when run from spark-sql cli.
Job cancelled because SparkContext was shut down - failures!
Hi, Trying to run a query on spark-sql but it keeps failing with this error on the cli ( we are running spark-sql on a yarn cluster): org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:700) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1405) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1352) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) On the UI, I see there are connection failures in mapPartition stage: java.net.SocketTimeoutException: Read timed out java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.read(SocketInputStream.java:150) java.net.SocketInputStream.read(SocketInputStream.java:121) java.io.BufferedInputStream.fill(BufferedInputStream.java:246) java.io.BufferedInputStream.read1(BufferedInputStream.java:286) java.io.BufferedInputStream.read(BufferedInputStream.java:345) sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703) sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534) sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439) org.apache.spark.util.Utils$.fetchFile(Utils.scala:362) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:331) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:329) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) scala.collection.mutable.HashMap.foreach(HashMap.scala:98) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:329) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
Re: Is SparkSQL + JDBC server a good approach for caching?
Is there a way to cache certain (or most latest) partitions of certain tables ? On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust mich...@databricks.com wrote: It does have support for caching using either CACHE TABLE tablename or CACHE TABLE tablename AS SELECT On Fri, Oct 24, 2014 at 1:05 AM, ankits ankitso...@gmail.com wrote: I want to set up spark SQL to allow ad hoc querying over the last X days of processed data, where the data is processed through spark. This would also have to cache data (in memory only), so the approach I was thinking of was to build a layer that persists the appropriate RDDs and stores them in memory. I see spark sql allows ad hoc querying through JDBC though I have never used that before. Will using JDBC offer any advantages (e.g does it have built in support for caching?) over rolling my own solution for this use case? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is SparkSQL + JDBC server a good approach for caching?
That works perfect. Thanks again Michael On Fri, Oct 24, 2014 at 3:10 PM, Michael Armbrust mich...@databricks.com wrote: It won't be transparent, but you can do so something like: CACHE TABLE newData AS SELECT * FROM allData WHERE date ... and then query newData. On Fri, Oct 24, 2014 at 12:06 PM, Sadhan Sood sadhan.s...@gmail.com wrote: Is there a way to cache certain (or most latest) partitions of certain tables ? On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust mich...@databricks.com wrote: It does have support for caching using either CACHE TABLE tablename or CACHE TABLE tablename AS SELECT On Fri, Oct 24, 2014 at 1:05 AM, ankits ankitso...@gmail.com wrote: I want to set up spark SQL to allow ad hoc querying over the last X days of processed data, where the data is processed through spark. This would also have to cache data (in memory only), so the approach I was thinking of was to build a layer that persists the appropriate RDDs and stores them in memory. I see spark sql allows ad hoc querying through JDBC though I have never used that before. Will using JDBC offer any advantages (e.g does it have built in support for caching?) over rolling my own solution for this use case? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sharing spark context across multiple spark sql cli initializations
We want to run multiple instances of spark sql cli on our yarn cluster. Each instance of the cli is to be used by a different user. This looks non-optimal if each user brings up a different cli given how spark works on yarn by running executor processes (and hence consuming resources) on worker nodes for the lifetime of the application. So, the right way seems like to use the same spark context shared across multiple initializations and running just one spark sql application. Is the understanding correct ? Is there a way to do it currently ? Seem like it needs some kind of thrift interface hooked into the cli driver.
read all parquet files in a directory in spark-sql
How can we read all parquet files in a directory in spark-sql. We are following this example which shows a way to read one file: // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.// The result of loading a Parquet file is also a SchemaRDD.val parquetFile = sqlContext.parquetFile(people.parquet) //Parquet files can also be registered as tables and then used in SQL statements.parquetFile.registerTempTable(parquetFile)
Re: read all parquet files in a directory in spark-sql
Thanks Nick, DB - that was helpful. On Mon, Oct 13, 2014 at 5:44 PM, DB Tsai dbt...@dbtsai.com wrote: For now, with SparkSPARK-3462 parquet pushdown for unionAll PR, you can do the following for unionAll schemaRDD. val files = Array(hdfs://file1.parquet, hdfs://file2.parquet, hdfs://file3.parquet) val rdds = paths.map(hc.parquetFile(_)) val unionedRDD = { var temp = rdds(0) for (i - 1 until rdds.length) { temp = temp.unionAll(rdds(i)) } temp } Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Oct 13, 2014 at 7:22 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Right now I believe the only supported option is to pass a comma-delimited list of paths. I've opened SPARK-3928: Support wildcard matches on Parquet files to request this feature. Nick On Mon, Oct 13, 2014 at 12:21 PM, Sadhan Sood sadhan.s...@gmail.com wrote: How can we read all parquet files in a directory in spark-sql. We are following this example which shows a way to read one file: // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a SchemaRDD. val parquetFile = sqlContext.parquetFile(people.parquet) //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable(parquetFile)
Fwd: how to find the sources for spark-project
-- Forwarded message -- From: Sadhan Sood sadhan.s...@gmail.com Date: Sat, Oct 11, 2014 at 10:26 AM Subject: Re: how to find the sources for spark-project To: Stephen Boesch java...@gmail.com Thanks, I still didn't find it - is it under some particular branch ? More specifically, I am looking to modify the file: ParquetHiveSerDe.java which is under this namespace: org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java and is being linked through this jar: dependency groupIdorg.spark-project.hive/groupId artifactIdhive-exec/artifactId version0.12.0/version /dependency It seems these are modified versions of the same jar in org.apache.hive: dependency groupIdorg.apache.hive/groupId artifactIdhive-exec/artifactId version0.13.1/version /dependency It'd be great if I can find the sources of this jar so I can modify it locally and rebuild jar. Not sure if I can rebuild spark with hive-exec artifact in org.apache.hive without breaking it. On Fri, Oct 10, 2014 at 7:28 PM, Stephen Boesch java...@gmail.com wrote: Git clone the spark projecthttps://github.com/apache/spark.git the hive related sources are under sql/hive/src/main/scala 2014-10-10 16:24 GMT-07:00 sadhan sadhan.s...@gmail.com: We have our own customization on top of parquet serde that we've been using for hive. In order to make it work with spark-sql, we need to be able to re-build spark with this. It'll be much easier to rebuild spark with this patch once I can find the sources for org.spark-project.hive. Not sure where to find it ? This seems like the place where we need to put our patch. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-find-the-sources-for-spark-project-tp16187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org