SPARK SQL- Parquet projection pushdown for nested data

2015-10-29 Thread Sadhan Sood
I noticed when querying struct data in spark sql, we are requesting the
whole column from parquet files. Is this intended or is there some kind of
config to control this behaviour? Wouldn't it be better to request just the
struct field?


Re: SPARK SQL- Parquet projection pushdown for nested data

2015-10-29 Thread Sadhan Sood
Thanks Michael, I will upvote this.

On Thu, Oct 29, 2015 at 10:29 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Yeah, this is unfortunate.  It would be good to fix this, but its a
> non-trivial change.
>
> Tracked here if you'd like to vote on the issue:
> https://issues.apache.org/jira/browse/SPARK-4502
>
> On Thu, Oct 29, 2015 at 6:00 PM, Sadhan Sood <sadhan.s...@gmail.com>
> wrote:
>
>> I noticed when querying struct data in spark sql, we are requesting the
>> whole column from parquet files. Is this intended or is there some kind of
>> config to control this behaviour? Wouldn't it be better to request just the
>> struct field?
>>
>
>


hive thriftserver and fair scheduling

2015-10-20 Thread Sadhan Sood
Hi All,

Does anyone have fair scheduling working for them in a hive server? I have
one hive thriftserver running and multiple users trying to run queries at
the same time on that server using a beeline client. I see that a big query
is stopping all other queries from making any progress. Is this supposed to
be this way? Is there anything else that I need to be doing for fair
scheduling to be working for the thriftserver?


Re: hive thriftserver and fair scheduling

2015-10-20 Thread Sadhan Sood
Thanks Michael, I'll try it out. Another quick/important question: How do I
make udfs available to all of the hive thriftserver users? Right now, when
I launch a spark-sql client, I notice that it reads the ~/.hiverc file and
all udfs get picked up but this doesn't seem to be working in hive
thriftserver. Is there a way to make it work in a similar way for all users
in hive thriftserver?

Thanks again!

On Tue, Oct 20, 2015 at 12:34 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Not the most obvious place in the docs... but this is probably helpful:
> https://spark.apache.org/docs/latest/sql-programming-guide.html#scheduling
>
> You likely want to put each user in their own pool.
>
> On Tue, Oct 20, 2015 at 11:55 AM, Sadhan Sood <sadhan.s...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> Does anyone have fair scheduling working for them in a hive server? I
>> have one hive thriftserver running and multiple users trying to run queries
>> at the same time on that server using a beeline client. I see that a big
>> query is stopping all other queries from making any progress. Is this
>> supposed to be this way? Is there anything else that I need to be doing for
>> fair scheduling to be working for the thriftserver?
>>
>
>


[SPARK-SQL] Requested array size exceeds VM limit

2015-09-25 Thread Sadhan Sood
I am trying to run a query on a month of data. The volume of data is not
much, but we have a partition per hour and per day. The table schema is
heavily nested with total of 300 leaf fields. I am trying to run a simple
select count(*) query on the table and running into this exception:

 SELECT
 >   COUNT(*)
 >  FROM
 >p_all_tx
 >  WHERE
 >date_prefix >= "20150500"
 >AND date_prefix <= "20150700"
 >AND sanitizeddetails.merchantaccountid = 'Rvr7StMZSTQj';

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2003)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:73)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:70)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:70)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141)


The table is a parquet table. I am not sure why the closure should exceed
VM limit. Could somebody explain why this is happening. Is it because I
have a lot of partitions and table scan is essentially creating one RDD per
partition.


SPARK-SQL parameter tuning for performance

2015-09-17 Thread Sadhan Sood
Hi Spark users,

We are running Spark on Yarn and often query table partitions as big as
100~200 GB from hdfs. Hdfs is co-located on the same cluster on which Spark
and Yarn run. I've noticed a much higher I/O read rates when I increase the
number of  executors cores from 2 to 8( Most tasks run in RACK_LOCAL and
few in NODE_LOCAL) while keeping the #executors constant. The ram on my
executor is around 24G. But the problem is that any subsequent shuffle
stage starts failing if I do that. It runs fine if i leave the number of
executors to 2 but then the read is much slower. The errors which I get
from Yarn is

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 1
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)


It seem to indicate Yarn is killing executors for using too much memory but
I can't be sure. I tried increasing the spark sql shuffle partitions as
well but that didn't help much. Is there a way we can run more partition
read tasks per executor but keep the #shuffle tasks still constant.


Thanks


Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Interestingly, if there is nothing running on dev spark-shell, it recovers
successfully and regains the lost executors. Attaching the log for that.
Notice, the Registering block manager .. statements in the very end after
all executors were lost.

On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood sadhan.s...@gmail.com wrote:

 Attaching log for when the dev job gets stuck (once all its executors are
 lost due to preemption). This is a spark-shell job running in yarn-client
 mode.

 On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi All,

 We've set up our spark cluster on aws running on yarn (running on hadoop
 2.3) with fair scheduling and preemption turned on. The cluster is shared
 for prod and dev work where prod runs with a higher fair share and can
 preempt dev jobs if there are not enough resources available for it.
 It appears that dev jobs which get preempted often get unstable after
 losing some executors and the whole jobs gets stuck (without making any
 progress) or end up getting restarted (and hence losing all the work done).
 Has someone encountered this before ? Is the solution just to set 
 spark.task.maxFailures
 to a really high value to recover from task failures in such scenarios? Are
 there other approaches that people have taken for spark multi tenancy that
 works better in such scenario?

 Thanks,
 Sadhan





spark_job_recovers.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Attaching log for when the dev job gets stuck (once all its executors are
lost due to preemption). This is a spark-shell job running in yarn-client
mode.

On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote:

 Hi All,

 We've set up our spark cluster on aws running on yarn (running on hadoop
 2.3) with fair scheduling and preemption turned on. The cluster is shared
 for prod and dev work where prod runs with a higher fair share and can
 preempt dev jobs if there are not enough resources available for it.
 It appears that dev jobs which get preempted often get unstable after
 losing some executors and the whole jobs gets stuck (without making any
 progress) or end up getting restarted (and hence losing all the work done).
 Has someone encountered this before ? Is the solution just to set 
 spark.task.maxFailures
 to a really high value to recover from task failures in such scenarios? Are
 there other approaches that people have taken for spark multi tenancy that
 works better in such scenario?

 Thanks,
 Sadhan



spark_job_stuck.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Hi All,

We've set up our spark cluster on aws running on yarn (running on hadoop
2.3) with fair scheduling and preemption turned on. The cluster is shared
for prod and dev work where prod runs with a higher fair share and can
preempt dev jobs if there are not enough resources available for it.
It appears that dev jobs which get preempted often get unstable after
losing some executors and the whole jobs gets stuck (without making any
progress) or end up getting restarted (and hence losing all the work done).
Has someone encountered this before ? Is the solution just to set
spark.task.maxFailures
to a really high value to recover from task failures in such scenarios? Are
there other approaches that people have taken for spark multi tenancy that
works better in such scenario?

Thanks,
Sadhan


Re: Error when cache partitioned Parquet table

2015-01-26 Thread Sadhan Sood
Hi Xu-dong,

Thats probably because your table's partition path don't look like
hdfs://somepath/key=value/*.parquet. Spark is trying to extract the
partition key's value from the path while caching and hence the exception
is being thrown since it can't find one.

On Mon, Jan 26, 2015 at 10:45 AM, ZHENG, Xu-dong dong...@gmail.com wrote:

 Hi all,

 I meet below error when I cache a partitioned Parquet table. It seems
 that, Spark is trying to extract the partitioned key in the Parquet file,
 so it is not found. But other query could run successfully, even request
 the partitioned key. Is it a bug in SparkSQL? Is there any workaround for
 it? Thank you!

 java.util.NoSuchElementException: key not found: querydate
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142)
   at 
 org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at 
 org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:142)
   at 
 org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:127)
   at 
 org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:247)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:724)

 --
 郑旭东
 ZHENG, Xu-dong




Re: does spark sql support columnar compression with encoding when caching tables

2014-12-22 Thread Sadhan Sood
Thanks Cheng, Michael - that was super helpful.

On Sun, Dec 21, 2014 at 7:27 AM, Cheng Lian lian.cs@gmail.com wrote:

  Would like to add that compression schemes built in in-memory columnar
 storage only supports primitive columns (int, string, etc.), complex types
 like array, map and struct are not supported.


 On 12/20/14 6:17 AM, Sadhan Sood wrote:

  Hey Michael,

 Thank you for clarifying that. Is tachyon the right way to get compressed
 data in memory or should we explore the option of adding compression to
 cached data. This is because our uncompressed data set is too big to fit in
 memory right now. I see the benefit of tachyon not just with storing
 compressed data in memory but we wouldn't have to create a separate table
 for caching some partitions like 'cache table table_cached as select * from
 table where date = 201412XX' - the way we are doing right now.


 On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust mich...@databricks.com
 wrote:

 There is only column level encoding (run length encoding, delta encoding,
 dictionary encoding) and no generic compression.

 On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi All,

  Wondering if when caching a table backed by lzo compressed parquet
 data, if spark also compresses it (using lzo/gzip/snappy) along with column
 level encoding or just does the column level encoding when 
 *spark.sql.inMemoryColumnarStorage.compressed
 *is set to true. This is because when I try to cache the data, I notice
 the memory being used is almost as much as the uncompressed size of the
 data.

  Thanks!





Re: does spark sql support columnar compression with encoding when caching tables

2014-12-19 Thread Sadhan Sood
Hey Michael,

Thank you for clarifying that. Is tachyon the right way to get compressed
data in memory or should we explore the option of adding compression to
cached data. This is because our uncompressed data set is too big to fit in
memory right now. I see the benefit of tachyon not just with storing
compressed data in memory but we wouldn't have to create a separate table
for caching some partitions like 'cache table table_cached as select * from
table where date = 201412XX' - the way we are doing right now.


On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust mich...@databricks.com
wrote:

 There is only column level encoding (run length encoding, delta encoding,
 dictionary encoding) and no generic compression.

 On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi All,

 Wondering if when caching a table backed by lzo compressed parquet data,
 if spark also compresses it (using lzo/gzip/snappy) along with column level
 encoding or just does the column level encoding when 
 *spark.sql.inMemoryColumnarStorage.compressed
 *is set to true. This is because when I try to cache the data, I notice
 the memory being used is almost as much as the uncompressed size of the
 data.

 Thanks!




Re: does spark sql support columnar compression with encoding when caching tables

2014-12-19 Thread Sadhan Sood
Thanks Michael, that makes sense.

On Fri, Dec 19, 2014 at 3:13 PM, Michael Armbrust mich...@databricks.com
wrote:

 Yeah, tachyon does sound like a good option here.  Especially if you have
 nested data, its likely that parquet in tachyon will always be better
 supported.

 On Fri, Dec 19, 2014 at 2:17 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hey Michael,

 Thank you for clarifying that. Is tachyon the right way to get compressed
 data in memory or should we explore the option of adding compression to
 cached data. This is because our uncompressed data set is too big to fit in
 memory right now. I see the benefit of tachyon not just with storing
 compressed data in memory but we wouldn't have to create a separate table
 for caching some partitions like 'cache table table_cached as select * from
 table where date = 201412XX' - the way we are doing right now.


 On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust mich...@databricks.com
  wrote:

 There is only column level encoding (run length encoding, delta
 encoding, dictionary encoding) and no generic compression.

 On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi All,

 Wondering if when caching a table backed by lzo compressed parquet
 data, if spark also compresses it (using lzo/gzip/snappy) along with column
 level encoding or just does the column level encoding when 
 *spark.sql.inMemoryColumnarStorage.compressed
 *is set to true. This is because when I try to cache the data, I
 notice the memory being used is almost as much as the uncompressed size of
 the data.

 Thanks!




does spark sql support columnar compression with encoding when caching tables

2014-12-18 Thread Sadhan Sood
Hi All,

Wondering if when caching a table backed by lzo compressed parquet data, if
spark also compresses it (using lzo/gzip/snappy) along with column level
encoding or just does the column level encoding when
*spark.sql.inMemoryColumnarStorage.compressed
*is set to true. This is because when I try to cache the data, I notice the
memory being used is almost as much as the uncompressed size of the data.

Thanks!


Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
Also attaching the parquet file if anyone wants to take a further look.

On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com wrote:

 So, I am seeing this issue with spark sql throwing an exception when
 trying to read selective columns from a thrift parquet file and also when
 caching them:
 On some further digging, I was able to narrow it down to at-least one
 particular column type: mapstring, setstring to be causing this issue.
 To reproduce this I created a test thrift file with a very basic schema and
 stored some sample data in a parquet file:

 Test.thrift
 ===
 typedef binary SomeId

 enum SomeExclusionCause {
   WHITELIST = 1,
   HAS_PURCHASE = 2,
 }

 struct SampleThriftObject {
   10: string col_a;
   20: string col_b;
   30: string col_c;
   40: optional mapSomeExclusionCause, setSomeId col_d;
 }
 =

 And loading the data in spark through schemaRDD:

 import org.apache.spark.sql.SchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc);
 val parquetFile = /path/to/generated/parquet/file
 val parquetFileRDD = sqlContext.parquetFile(parquetFile)
 parquetFileRDD.printSchema
 root
  |-- col_a: string (nullable = true)
  |-- col_b: string (nullable = true)
  |-- col_c: string (nullable = true)
  |-- col_d: map (nullable = true)
  ||-- key: string
  ||-- value: array (valueContainsNull = true)
  |||-- element: string (containsNull = false)

 parquetFileRDD.registerTempTable(test)
 sqlContext.cacheTable(test)
 sqlContext.sql(select col_a from test).collect() -- see the exception
 stack here

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
 value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
 ... 27 more

 If you take out the col_d from the thrift file, the problem goes away. The
 problem

Adding partitions to parquet data

2014-11-20 Thread Sadhan Sood
We are loading parquet data as temp tables but wondering if there is a way
to add a partition to the data without going through hive (we still want to
use spark's parquet serde as compared to hive). The data looks like -

/date1/file1, /date1/file2 ... , /date2/file1,
/date2/file2,/daten/filem

and we are loading it like:
val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file
names)

but it would be nice to able to add a partition and provide date in the
query parameter.


Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
I am running on master, pulled yesterday I believe but saw the same issue
with 1.2.0

On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust mich...@databricks.com
wrote:

 Which version are you running on again?

 On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Also attaching the parquet file if anyone wants to take a further look.

 On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 So, I am seeing this issue with spark sql throwing an exception when
 trying to read selective columns from a thrift parquet file and also when
 caching them:
 On some further digging, I was able to narrow it down to at-least one
 particular column type: mapstring, setstring to be causing this issue.
 To reproduce this I created a test thrift file with a very basic schema and
 stored some sample data in a parquet file:

 Test.thrift
 ===
 typedef binary SomeId

 enum SomeExclusionCause {
   WHITELIST = 1,
   HAS_PURCHASE = 2,
 }

 struct SampleThriftObject {
   10: string col_a;
   20: string col_b;
   30: string col_c;
   40: optional mapSomeExclusionCause, setSomeId col_d;
 }
 =

 And loading the data in spark through schemaRDD:

 import org.apache.spark.sql.SchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc);
 val parquetFile = /path/to/generated/parquet/file
 val parquetFileRDD = sqlContext.parquetFile(parquetFile)
 parquetFileRDD.printSchema
 root
  |-- col_a: string (nullable = true)
  |-- col_b: string (nullable = true)
  |-- col_c: string (nullable = true)
  |-- col_d: map (nullable = true)
  ||-- key: string
  ||-- value: array (valueContainsNull = true)
  |||-- element: string (containsNull = false)

 parquetFileRDD.registerTempTable(test)
 sqlContext.cacheTable(test)
 sqlContext.sql(select col_a from test).collect() -- see the exception
 stack here

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
 value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96

Re: Adding partitions to parquet data

2014-11-20 Thread Sadhan Sood
Ah awesome, thanks!!

On Thu, Nov 20, 2014 at 3:01 PM, Michael Armbrust mich...@databricks.com
wrote:

 In 1.2 by default we use Spark parquet support instead of Hive when the
 SerDe contains the word Parquet.  This should work with hive partitioning.

 On Thu, Nov 20, 2014 at 10:33 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 We are loading parquet data as temp tables but wondering if there is a
 way to add a partition to the data without going through hive (we still
 want to use spark's parquet serde as compared to hive). The data looks like
 -

 /date1/file1, /date1/file2 ... , /date2/file1,
 /date2/file2,/daten/filem

 and we are loading it like:
 val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file
 names)

 but it would be nice to able to add a partition and provide date in the
 query parameter.





Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
Thanks Michael, opened this https://issues.apache.org/jira/browse/SPARK-4520

On Thu, Nov 20, 2014 at 2:59 PM, Michael Armbrust mich...@databricks.com
wrote:

 Can you open a JIRA?

 On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 I am running on master, pulled yesterday I believe but saw the same issue
 with 1.2.0

 On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust mich...@databricks.com
  wrote:

 Which version are you running on again?

 On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Also attaching the parquet file if anyone wants to take a further look.

 On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 So, I am seeing this issue with spark sql throwing an exception when
 trying to read selective columns from a thrift parquet file and also when
 caching them:
 On some further digging, I was able to narrow it down to at-least one
 particular column type: mapstring, setstring to be causing this issue.
 To reproduce this I created a test thrift file with a very basic schema 
 and
 stored some sample data in a parquet file:

 Test.thrift
 ===
 typedef binary SomeId

 enum SomeExclusionCause {
   WHITELIST = 1,
   HAS_PURCHASE = 2,
 }

 struct SampleThriftObject {
   10: string col_a;
   20: string col_b;
   30: string col_c;
   40: optional mapSomeExclusionCause, setSomeId col_d;
 }
 =

 And loading the data in spark through schemaRDD:

 import org.apache.spark.sql.SchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc);
 val parquetFile = /path/to/generated/parquet/file
 val parquetFileRDD = sqlContext.parquetFile(parquetFile)
 parquetFileRDD.printSchema
 root
  |-- col_a: string (nullable = true)
  |-- col_b: string (nullable = true)
  |-- col_c: string (nullable = true)
  |-- col_d: map (nullable = true)
  ||-- key: string
  ||-- value: array (valueContainsNull = true)
  |||-- element: string (containsNull = false)

 parquetFileRDD.registerTempTable(test)
 sqlContext.cacheTable(test)
 sqlContext.sql(select col_a from test).collect() -- see the
 exception stack here

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
 stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not
 read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282

Re: Exception in spark sql when running a group by query

2014-11-18 Thread Sadhan Sood
ah makes sense - Thanks Michael!

On Mon, Nov 17, 2014 at 6:08 PM, Michael Armbrust mich...@databricks.com
wrote:

 You are perhaps hitting an issue that was fixed by #3248
 https://github.com/apache/spark/pull/3248?

 On Mon, Nov 17, 2014 at 9:58 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 While testing sparkSQL, we were running this group by with expression
 query and got an exception. The same query worked fine on hive.

 SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
   '/MM/dd') as pst_date,
 count(*) as num_xyzs
   FROM
 all_matched_abc
   GROUP BY
 from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
   '/MM/dd')

 14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT 
 from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
   '/MM/dd') as pst_date,
 count(*) as num_xyzs
   FROM
 all_matched_abc
   GROUP BY
 from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
   '/MM/dd')
 ]
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression 
 not in GROUP BY: 
 HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
  AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, 
 DoubleType))),/MM/dd) AS pst_date#179, tree:

 Aggregate 
 [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived,
  DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)], 
 [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
  AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, 
 DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L]

  MetastoreRelation default, all_matched_abc, None
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125)
 at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115)
 at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
 at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113)
 at 
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
 at 
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
 at 
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
 at 
 scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
 at 
 scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
 at 
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
 at 
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at 
 org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.executedPlan

Exception in spark sql when running a group by query

2014-11-17 Thread Sadhan Sood
While testing sparkSQL, we were running this group by with expression query
and got an exception. The same query worked fine on hive.

SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd') as pst_date,
count(*) as num_xyzs
  FROM
all_matched_abc
  GROUP BY
from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd')

14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT
from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd') as pst_date,
count(*) as num_xyzs
  FROM
all_matched_abc
  GROUP BY
from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd')
]
org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Expression not in GROUP BY:
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200,
DoubleType))),/MM/dd) AS pst_date#179, tree:

Aggregate 
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived,
DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)],
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200,
DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L]

 MetastoreRelation default, all_matched_abc, None
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at 
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
at 
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:425)
at 
org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:59)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:276)
at 

Re: SparkSQL exception on cached parquet table

2014-11-16 Thread Sadhan Sood
Hi Cheng,

I tried reading the parquet file(on which we were getting the exception)
through parquet-tools and it is able to dump the file and I can read the
metadata, etc. I also loaded the file through hive table and can run a
table scan query on it as well. Let me know if I can do more to help
resolve the problem, I'll run it through a debugger and see if I can get
more information on it in the meantime.

Thanks,
Sadhan

On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian lian.cs@gmail.com wrote:

  (Forgot to cc user mail list)


 On 11/16/14 4:59 PM, Cheng Lian wrote:

 Hey Sadhan,

  Thanks for the additional information, this is helpful. Seems that some
 Parquet internal contract was broken, but I'm not sure whether it's caused
 by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
 somehow. I'm investigating this. In the meanwhile, would you mind to help
 to narrow down the problem by trying to scan exactly the same Parquet file
 with some other systems (e.g. Hive or Impala)? If other systems work, then
 there must be something wrong with Spark SQL.

  Cheng

 On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi Cheng,

  Thanks for your response. Here is the stack trace from yarn logs:

  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at 
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at 
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at 
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at 
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
 at 
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
 ... 26 more


 On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hi Sadhan,

 Could you please provide the stack trace of the
 ArrayIndexOutOfBoundsException (if any)? The reason why the first query
 succeeds is that Spark SQL doesn’t bother reading all data from the table
 to give COUNT(*). In the second case, however, the whole table is asked
 to be cached lazily via the cacheTable call, thus it’s scanned to build
 the in-memory columnar cache. Then thing went wrong while scanning this LZO
 compressed Parquet file. But unfortunately the stack trace at hand doesn’t
 indicate the root cause.

 Cheng

 On 11/15/14 5:28 AM, Sadhan Sood wrote:

 While testing SparkSQL on a bunch of parquet files (basically used to be
 a partition for one of our hive tables), I encountered this error:

  import org.apache.spark.sql.SchemaRDD
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;

  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
 parquetFileRDD.registerTempTable(xyz_20141109)
 sqlContext.sql(SELECT count(*)  FROM xyz_20141109).collect() -- works
 fine
 sqlContext.cacheTable(xyz_20141109)
 sqlContext.sql(SELECT count(*)  FROM xyz_20141109).collect() -- fails
 with an exception

   parquet.io.ParquetDecodingException: Can not read value at 0 in block
 -1 in file
 hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet

 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

 at
 org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)

 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

 at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)

 at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)

 at
 org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)

 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

 at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)

 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35

SparkSQL exception on cached parquet table

2014-11-14 Thread Sadhan Sood
While testing SparkSQL on a bunch of parquet files (basically used to be a
partition for one of our hive tables), I encountered this error:

import org.apache.spark.sql.SchemaRDD
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.registerTempTable(xyz_20141109)
sqlContext.sql(SELECT count(*)  FROM xyz_20141109).collect() -- works
fine
sqlContext.cacheTable(xyz_20141109)
sqlContext.sql(SELECT count(*)  FROM xyz_20141109).collect() -- fails
with an exception

parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in
file
hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet

at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)

at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)

at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException


Re: Cache sparkSql data without uncompressing it in memory

2014-11-14 Thread Sadhan Sood
Thanks Cheng, that was helpful. I noticed from UI that only half of the
memory per executor was being used for caching, is that true? We have a 2
TB sequence file dataset that we wanted to cache in our cluster with ~ 5TB
memory but caching still failed and what looked like from the UI was that
it used 2.5 TB of memory and almost wrote 12 TB to disk (at which point it
was useless) during the mapPartition stage. Also, couldn't run more than 2
executors/box (60g memory/box) or else it died very quickly from lesser
memory/executor (not sure why?) although I/O seemed to be going much faster
which makes sense because of more parallel reads.

On Thu, Nov 13, 2014 at 10:50 PM, Cheng Lian lian.cs@gmail.com wrote:

  No, the columnar buffer is built in a small batching manner, the batch
 size is controlled by the spark.sql.inMemoryColumnarStorage.batchSize
 property. The default value for this in master and branch-1.2 is 10,000
 rows per batch.

 On 11/14/14 1:27 AM, Sadhan Sood wrote:

   Thanks Chneg, Just one more question - does that mean that we still
 need enough memory in the cluster to uncompress the data before it can be
 compressed again or does that just read the raw data as is?

 On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Currently there’s no way to cache the compressed sequence file
 directly. Spark SQL uses in-memory columnar format while caching table
 rows, so we must read all the raw data and convert them into columnar
 format. However, you can enable in-memory columnar compression by setting
 spark.sql.inMemoryColumnarStorage.compressed to true. This property is
 already set to true by default in master branch and branch-1.2.

 On 11/13/14 7:16 AM, Sadhan Sood wrote:

 We noticed while caching data from our hive tables which contain data in
 compressed sequence file format that it gets uncompressed in memory when
 getting cached. Is there a way to turn this off and cache the compressed
 data as is ?

  ​


​



Re: Cache sparkSql data without uncompressing it in memory

2014-11-13 Thread Sadhan Sood
Thanks Chneg, Just one more question - does that mean that we still need
enough memory in the cluster to uncompress the data before it can be
compressed again or does that just read the raw data as is?

On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote:

  Currently there’s no way to cache the compressed sequence file directly.
 Spark SQL uses in-memory columnar format while caching table rows, so we
 must read all the raw data and convert them into columnar format. However,
 you can enable in-memory columnar compression by setting
 spark.sql.inMemoryColumnarStorage.compressed to true. This property is
 already set to true by default in master branch and branch-1.2.

 On 11/13/14 7:16 AM, Sadhan Sood wrote:

   We noticed while caching data from our hive tables which contain data
 in compressed sequence file format that it gets uncompressed in memory when
 getting cached. Is there a way to turn this off and cache the compressed
 data as is ?

   ​



Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
We are running spark on yarn with combined memory  1TB and when trying to
cache a table partition(which is  100G), seeing a lot of failed collect
stages in the UI and this never succeeds. Because of the failed collect, it
seems like the mapPartitions keep getting resubmitted. We have more than
enough memory so its surprising we are seeing this issue. Can someone
please help. Thanks!

The stack trace of the failed collect from UI is:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
 at
DAGScheduler.scala:838

2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:11:15,482 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated

2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372

2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)




On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood sadhan.s...@gmail.com wrote:

 We are running spark on yarn with combined memory  1TB and when trying to
 cache a table partition(which is  100G), seeing a lot of failed collect
 stages in the UI and this never succeeds. Because of the failed collect, it
 seems like the mapPartitions keep getting resubmitted. We have more than
 enough memory so its surprising we are seeing this issue. Can someone
 please help. Thanks!

 The stack trace of the failed collect from UI is:

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 0
   at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
   at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at 
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
   at 
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
   at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195

Re: Building spark targz

2014-11-12 Thread Sadhan Sood
Just making sure but are you looking for the tar in assembly/target dir ?

On Wed, Nov 12, 2014 at 3:14 PM, Ashwin Shankar ashwinshanka...@gmail.com
wrote:

 Hi,
 I just cloned spark from the github and I'm trying to build to generate a
 tar ball.
 I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
 -DskipTests clean package

 Although the build is successful, I don't see the targz generated.

 Am I running the wrong command ?

 --
 Thanks,
 Ashwin





Re: Building spark targz

2014-11-12 Thread Sadhan Sood
I think you can provide -Pbigtop-dist to build the tar.

On Wed, Nov 12, 2014 at 3:21 PM, Sean Owen so...@cloudera.com wrote:

 mvn package doesn't make tarballs. It creates artifacts that will
 generally appear in target/ and subdirectories, and likewise within
 modules. Look at make-distribution.sh

 On Wed, Nov 12, 2014 at 8:14 PM, Ashwin Shankar ashwinshanka...@gmail.com
  wrote:

 Hi,
 I just cloned spark from the github and I'm trying to build to generate a
 tar ball.
 I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
 -DskipTests clean package

 Although the build is successful, I don't see the targz generated.

 Am I running the wrong command ?

 --
 Thanks,
 Ashwin






Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:

rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0

The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.

On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood sadhan.s...@gmail.com wrote:

 This is the log output:

 2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
 (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
 SELECT * FROM xyz where date_prefix = 20141112'

 2014-11-12 19:07:17,455 INFO  Configuration.deprecation
 (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
 deprecated. Instead, use mapreduce.job.maps

 2014-11-12 19:07:17,756 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
 TableReader.scala:68

 2014-11-12 19:07:18,292 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84

 2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
 (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200

 2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
 Exchange.scala:86)

 2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
 with 1 output partitions (allowLocal=false)

 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
 SparkPlan.scala:84)

 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)

 2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)

 2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
 mapPartitions at Exchange.scala:86), which has no missing parents

 2014-11-12 19:07:22,916 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
 DAGScheduler.scala:838

 2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
 (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

 2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
 finished in 161.113 s

 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - looking for newly runnable stages

 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - running: Set()

 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - failed: Set()

 2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

 2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
 SparkPlan.scala:84), which is now runnable

 2014-11-12 19:10:04,112 INFO  spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
 DAGScheduler.scala:838

 2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
 (MappedRDD[16] at map at SparkPlan.scala:84)

 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
 (Logging.scala:logError(75)) - Lost executor 52 on
 ip-10-61-175-167.ec2.internal: remote Akka client disassociated

 2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
 (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
 [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
 failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
 (Logging.scala:logError(75)) - Asked to remove non-existent executor 52

 2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)

 2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59))
 - Stage 0 is now unavailable on executor 52 (460/461, false)

 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
 SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
 (mapPartitions at Exchange.scala:86)

 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
 (Logging.scala:logInfo(59

Cache sparkSql data without uncompressing it in memory

2014-11-12 Thread Sadhan Sood
We noticed while caching data from our hive tables which contain data in
compressed sequence file format that it gets uncompressed in memory when
getting cached. Is there a way to turn this off and cache the compressed
data as is ?


Re: thrift jdbc server probably running queries as hive query

2014-11-11 Thread Sadhan Sood
Hi Cheng,

I made sure the only hive server running on the machine is
hivethriftserver2.

/usr/lib/jvm/default-java/bin/java -cp
/usr/lib/hadoop/lib/hadoop-lzo.jar::/mnt/sadhan/spark-3/sbin/../conf:/mnt/sadhan/spark-3/spark-assembly-1.2.0-SNAPSHOT-hadoop2.3.0-cdh5.0.2.jar:/etc/hadoop/conf
-Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master yarn
--jars reporting.jar spark-internal

The query I am running is a simple count(*): select count(*) from Xyz
where date_prefix=20141031 and pretty sure it's submitting a map reduce
job based on the spark logs:

TakesRest=false

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

  set hive.exec.reducers.bytes.per.reducer=number

In order to limit the maximum number of reducers:

  set hive.exec.reducers.max=number

In order to set a constant number of reducers:

  set mapreduce.job.reduces=number

14/11/11 16:23:17 INFO ql.Context: New scratch dir is
hdfs://fdsfdsfsdfsdf:9000/tmp/hive-ubuntu/hive_2014-11-11_16-23-17_333_5669798325805509526-2

Starting Job = job_1414084656759_0142, Tracking URL =
http://xxx:8100/proxy/application_1414084656759_0142/
http://t.signauxdix.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XYg2zGvG-W8rBGxP1p8d-TW64zBkx56dS1Dd58vwq02?t=http%3A%2F%2Fec2-54-83-34-89.compute-1.amazonaws.com%3A8100%2Fproxy%2Fapplication_1414084656759_0142%2Fsi=6222577584832512pi=626685a9-b628-43cc-91a1-93636171ce77

Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1414084656759_0142

On Mon, Nov 10, 2014 at 9:59 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Sadhan,

 I really don't think this is Spark log... Unlike Shark, Spark SQL doesn't
 even provide a Hive mode to let you execute queries against Hive. Would you
 please check whether there is an existing HiveServer2 running there? Spark
 SQL HiveThriftServer2 is just a Spark port of HiveServer2, and they share
 the same default listening port. I guess the Thrift server didn't start
 successfully because the HiveServer2 occupied the port, and your Beeline
 session was probably linked against HiveServer2.

 Cheng


 On 11/11/14 8:29 AM, Sadhan Sood wrote:

 I was testing out the spark thrift jdbc server by running a simple query
 in the beeline client. The spark itself is running on a yarn cluster.

 However, when I run a query in beeline - I see no running jobs in the
 spark UI(completely empty) and the yarn UI seem to indicate that the
 submitted query is being run as a map reduce job. This is probably also
 being indicated from the spark logs but I am not completely sure:

  2014-11-11 00:19:00,492 INFO  ql.Context
 (Context.java:getMRScratchDir(267)) - New scratch dir is
 hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1

 2014-11-11 00:19:00,877 INFO  ql.Context
 (Context.java:getMRScratchDir(267)) - New scratch dir is
 hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

 2014-11-11 00:19:04,152 INFO  ql.Context
 (Context.java:getMRScratchDir(267)) - New scratch dir is
 hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

 2014-11-11 00:19:04,425 INFO  Configuration.deprecation
 (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication
 is deprecated. Instead, use mapreduce.client.submit.file.replication

 2014-11-11 00:19:04,516 INFO  client.RMProxy
 (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
 at :8032

 2014-11-11 00:19:04,607 INFO  client.RMProxy
 (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
 at :8032

 2014-11-11 00:19:04,639 WARN  mapreduce.JobSubmitter
 (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option
 parsing not performed. Implement the Tool interface and execute your
 application with ToolRunner to remedy this

 2014-11-11 00:00:08,806 INFO  input.FileInputFormat
 (FileInputFormat.java:listStatus(287)) - Total input paths to process :
 14912

 2014-11-11 00:00:08,864 INFO  lzo.GPLNativeCodeLoader
 (GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library

 2014-11-11 00:00:08,866 INFO  lzo.LzoCodec (LzoCodec.java:clinit(76)) -
 Successfully loaded  initialized native-lzo library [hadoop-lzo rev
 8e266e052e423af592871e2dfe09d54c03f6a0e8]

 2014-11-11 00:00:09,873 INFO  input.CombineFileInputFormat
 (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node
 allocation with : CompletedNodes: 1, size left: 194541317

 2014-11-11 00:00:10,017 INFO  mapreduce.JobSubmitter
 (JobSubmitter.java:submitJobInternal(396)) - number of splits:615

 2014-11-11 00:00:10,095 INFO  mapreduce.JobSubmitter
 (JobSubmitter.java:printTokens(479)) - Submitting tokens for job:
 job_1414084656759_0115

 2014-11-11 00:00:10,241 INFO  impl.YarnClientImpl
 (YarnClientImpl.java:submitApplication(167

thrift jdbc server probably running queries as hive query

2014-11-10 Thread Sadhan Sood
I was testing out the spark thrift jdbc server by running a simple query in
the beeline client. The spark itself is running on a yarn cluster.

However, when I run a query in beeline - I see no running jobs in the
spark UI(completely empty) and the yarn UI seem to indicate that the
submitted query is being run as a map reduce job. This is probably also
being indicated from the spark logs but I am not completely sure:

2014-11-11 00:19:00,492 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1

2014-11-11 00:19:00,877 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

2014-11-11 00:19:04,152 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

2014-11-11 00:19:04,425 INFO  Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication
is deprecated. Instead, use mapreduce.client.submit.file.replication

2014-11-11 00:19:04,516 INFO  client.RMProxy
(RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
at :8032

2014-11-11 00:19:04,607 INFO  client.RMProxy
(RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
at :8032

2014-11-11 00:19:04,639 WARN  mapreduce.JobSubmitter
(JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your
application with ToolRunner to remedy this

2014-11-11 00:00:08,806 INFO  input.FileInputFormat
(FileInputFormat.java:listStatus(287)) - Total input paths to process :
14912

2014-11-11 00:00:08,864 INFO  lzo.GPLNativeCodeLoader
(GPLNativeCodeLoader.java:clinit(34)) - Loaded native gpl library

2014-11-11 00:00:08,866 INFO  lzo.LzoCodec (LzoCodec.java:clinit(76)) -
Successfully loaded  initialized native-lzo library [hadoop-lzo rev
8e266e052e423af592871e2dfe09d54c03f6a0e8]

2014-11-11 00:00:09,873 INFO  input.CombineFileInputFormat
(CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node
allocation with : CompletedNodes: 1, size left: 194541317

2014-11-11 00:00:10,017 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:submitJobInternal(396)) - number of splits:615

2014-11-11 00:00:10,095 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:printTokens(479)) - Submitting tokens for job:
job_1414084656759_0115

2014-11-11 00:00:10,241 INFO  impl.YarnClientImpl
(YarnClientImpl.java:submitApplication(167)) - Submitted application
application_1414084656759_0115


It seems like the query is being run as a hive query instead of spark
query. The same query works fine when run from spark-sql cli.


Job cancelled because SparkContext was shut down - failures!

2014-10-24 Thread Sadhan Sood
Hi,

Trying to run a query on spark-sql but it keeps failing with this error on
the cli ( we are running spark-sql on a yarn cluster):


org.apache.spark.SparkException: Job cancelled because SparkContext was
shut down
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:700)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
  at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
  at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:699)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1405)
  at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1352)
  at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
  at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
  at akka.actor.ActorCell.terminate(ActorCell.scala:369)
  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
  at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

On the UI, I see there are connection failures in mapPartition stage:

java.net.SocketTimeoutException: Read timed out
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:150)
java.net.SocketInputStream.read(SocketInputStream.java:121)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703)
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)

sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534)

sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439)
org.apache.spark.util.Utils$.fetchFile(Utils.scala:362)

org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:331)

org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:329)

scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:329)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


Re: Is SparkSQL + JDBC server a good approach for caching?

2014-10-24 Thread Sadhan Sood
Is there a way to cache certain (or most latest) partitions of certain
tables ?

On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust mich...@databricks.com
wrote:

 It does have support for caching using either CACHE TABLE tablename or
 CACHE TABLE tablename AS SELECT 

 On Fri, Oct 24, 2014 at 1:05 AM, ankits ankitso...@gmail.com wrote:

 I want to set up spark SQL to allow ad hoc querying over the last X days
 of
 processed data, where the data is processed through spark. This would also
 have to cache data (in memory only), so the approach I was thinking of was
 to build a layer that persists the appropriate RDDs and stores them in
 memory.

 I see spark sql allows ad hoc querying through JDBC though I have never
 used
 that before. Will using JDBC offer any advantages (e.g does it have built
 in
 support for caching?) over rolling my own solution for this use case?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Is SparkSQL + JDBC server a good approach for caching?

2014-10-24 Thread Sadhan Sood
That works perfect. Thanks again Michael

On Fri, Oct 24, 2014 at 3:10 PM, Michael Armbrust mich...@databricks.com
wrote:

 It won't be transparent, but you can do so something like:

 CACHE TABLE newData AS SELECT * FROM allData WHERE date  ...

 and then query newData.

 On Fri, Oct 24, 2014 at 12:06 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Is there a way to cache certain (or most latest) partitions of certain
 tables ?

 On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust mich...@databricks.com
  wrote:

 It does have support for caching using either CACHE TABLE tablename or
 CACHE TABLE tablename AS SELECT 

 On Fri, Oct 24, 2014 at 1:05 AM, ankits ankitso...@gmail.com wrote:

 I want to set up spark SQL to allow ad hoc querying over the last X
 days of
 processed data, where the data is processed through spark. This would
 also
 have to cache data (in memory only), so the approach I was thinking of
 was
 to build a layer that persists the appropriate RDDs and stores them in
 memory.

 I see spark sql allows ad hoc querying through JDBC though I have never
 used
 that before. Will using JDBC offer any advantages (e.g does it have
 built in
 support for caching?) over rolling my own solution for this use case?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Sharing spark context across multiple spark sql cli initializations

2014-10-22 Thread Sadhan Sood
We want to run multiple instances of spark sql cli on our yarn cluster.
Each instance of the cli is to be used by a different user. This looks
non-optimal if each user brings up a different cli given how spark works on
yarn by running executor processes (and hence consuming resources) on
worker nodes for the lifetime of the application. So, the right way seems
like to use the same spark context shared across multiple initializations
and running just one spark sql application. Is the understanding correct ?
Is there a way to do it currently ? Seem like it needs some kind of thrift
interface hooked into the cli driver.


read all parquet files in a directory in spark-sql

2014-10-13 Thread Sadhan Sood
How can we read all parquet files in a directory in spark-sql. We are
following this example which shows a way to read one file:

// Read in the parquet file created above.  Parquet files are
self-describing so the schema is preserved.// The result of loading a
Parquet file is also a SchemaRDD.val parquetFile =
sqlContext.parquetFile(people.parquet)
//Parquet files can also be registered as tables and then used in SQL
statements.parquetFile.registerTempTable(parquetFile)


Re: read all parquet files in a directory in spark-sql

2014-10-13 Thread Sadhan Sood
Thanks Nick, DB - that was helpful.

On Mon, Oct 13, 2014 at 5:44 PM, DB Tsai dbt...@dbtsai.com wrote:

 For now, with SparkSPARK-3462 parquet pushdown for unionAll PR, you
 can do the following for unionAll schemaRDD.

   val files = Array(hdfs://file1.parquet, hdfs://file2.parquet,
 hdfs://file3.parquet)
   val rdds = paths.map(hc.parquetFile(_))

   val unionedRDD = {
 var temp = rdds(0)
 for (i - 1 until rdds.length) {
   temp = temp.unionAll(rdds(i))
 }
 temp
   }

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Mon, Oct 13, 2014 at 7:22 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Right now I believe the only supported option is to pass a
 comma-delimited
  list of paths.
 
  I've opened SPARK-3928: Support wildcard matches on Parquet files to
 request
  this feature.
 
  Nick
 
  On Mon, Oct 13, 2014 at 12:21 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:
 
  How can we read all parquet files in a directory in spark-sql. We are
  following this example which shows a way to read one file:
 
  // Read in the parquet file created above.  Parquet files are
  self-describing so the schema is preserved.
  // The result of loading a Parquet file is also a SchemaRDD.
  val parquetFile = sqlContext.parquetFile(people.parquet)
 
  //Parquet files can also be registered as tables and then used in SQL
  statements.
  parquetFile.registerTempTable(parquetFile)
 
 



Fwd: how to find the sources for spark-project

2014-10-11 Thread Sadhan Sood
-- Forwarded message --
From: Sadhan Sood sadhan.s...@gmail.com
Date: Sat, Oct 11, 2014 at 10:26 AM
Subject: Re: how to find the sources for spark-project
To: Stephen Boesch java...@gmail.com


Thanks, I still didn't find it - is it under some particular branch ? More
specifically, I am looking to modify the file: ParquetHiveSerDe.java which
is under this namespace:

org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java and is
being linked through this jar:

dependency

groupIdorg.spark-project.hive/groupId

artifactIdhive-exec/artifactId

version0.12.0/version

/dependency

It seems these are modified versions of the same jar in org.apache.hive:

dependency

groupIdorg.apache.hive/groupId

artifactIdhive-exec/artifactId

version0.13.1/version

/dependency

It'd be great if I can find the sources of this jar so I can modify it
locally and rebuild jar. Not sure if I can rebuild spark with hive-exec
artifact in org.apache.hive without breaking it.



On Fri, Oct 10, 2014 at 7:28 PM, Stephen Boesch java...@gmail.com wrote:

 Git clone the spark projecthttps://github.com/apache/spark.git
  the hive related sources are under sql/hive/src/main/scala

 2014-10-10 16:24 GMT-07:00 sadhan sadhan.s...@gmail.com:

 We have our own customization on top of parquet serde that we've been using
 for hive. In order to make it work with spark-sql, we need to be able to
 re-build spark with this. It'll be much easier to rebuild spark with this
 patch once I can find the sources for org.spark-project.hive. Not sure
 where
 to find it ? This seems like the place where we need to put our patch.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-find-the-sources-for-spark-project-tp16187.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org