[SPARK-SQL] Requested array size exceeds VM limit

2015-09-25 Thread Sadhan Sood
I am trying to run a query on a month of data. The volume of data is not
much, but we have a partition per hour and per day. The table schema is
heavily nested with total of 300 leaf fields. I am trying to run a simple
select count(*) query on the table and running into this exception:

 SELECT
 >   COUNT(*)
 >  FROM
 >p_all_tx
 >  WHERE
 >date_prefix >= "20150500"
 >AND date_prefix <= "20150700"
 >AND sanitizeddetails.merchantaccountid = 'Rvr7StMZSTQj';

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2003)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:73)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:70)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:70)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141)


The table is a parquet table. I am not sure why the closure should exceed
VM limit. Could somebody explain why this is happening. Is it because I
have a lot of partitions and table scan is essentially creating one RDD per
partition.


hive thriftserver and fair scheduling

2015-10-20 Thread Sadhan Sood
Hi All,

Does anyone have fair scheduling working for them in a hive server? I have
one hive thriftserver running and multiple users trying to run queries at
the same time on that server using a beeline client. I see that a big query
is stopping all other queries from making any progress. Is this supposed to
be this way? Is there anything else that I need to be doing for fair
scheduling to be working for the thriftserver?


Re: hive thriftserver and fair scheduling

2015-10-20 Thread Sadhan Sood
Thanks Michael, I'll try it out. Another quick/important question: How do I
make udfs available to all of the hive thriftserver users? Right now, when
I launch a spark-sql client, I notice that it reads the ~/.hiverc file and
all udfs get picked up but this doesn't seem to be working in hive
thriftserver. Is there a way to make it work in a similar way for all users
in hive thriftserver?

Thanks again!

On Tue, Oct 20, 2015 at 12:34 PM, Michael Armbrust 
wrote:

> Not the most obvious place in the docs... but this is probably helpful:
> https://spark.apache.org/docs/latest/sql-programming-guide.html#scheduling
>
> You likely want to put each user in their own pool.
>
> On Tue, Oct 20, 2015 at 11:55 AM, Sadhan Sood 
> wrote:
>
>> Hi All,
>>
>> Does anyone have fair scheduling working for them in a hive server? I
>> have one hive thriftserver running and multiple users trying to run queries
>> at the same time on that server using a beeline client. I see that a big
>> query is stopping all other queries from making any progress. Is this
>> supposed to be this way? Is there anything else that I need to be doing for
>> fair scheduling to be working for the thriftserver?
>>
>
>


SPARK SQL- Parquet projection pushdown for nested data

2015-10-29 Thread Sadhan Sood
I noticed when querying struct data in spark sql, we are requesting the
whole column from parquet files. Is this intended or is there some kind of
config to control this behaviour? Wouldn't it be better to request just the
struct field?


Re: SPARK SQL- Parquet projection pushdown for nested data

2015-10-29 Thread Sadhan Sood
Thanks Michael, I will upvote this.

On Thu, Oct 29, 2015 at 10:29 AM, Michael Armbrust 
wrote:

> Yeah, this is unfortunate.  It would be good to fix this, but its a
> non-trivial change.
>
> Tracked here if you'd like to vote on the issue:
> https://issues.apache.org/jira/browse/SPARK-4502
>
> On Thu, Oct 29, 2015 at 6:00 PM, Sadhan Sood 
> wrote:
>
>> I noticed when querying struct data in spark sql, we are requesting the
>> whole column from parquet files. Is this intended or is there some kind of
>> config to control this behaviour? Wouldn't it be better to request just the
>> struct field?
>>
>
>


Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Hi All,

We've set up our spark cluster on aws running on yarn (running on hadoop
2.3) with fair scheduling and preemption turned on. The cluster is shared
for prod and dev work where prod runs with a higher fair share and can
preempt dev jobs if there are not enough resources available for it.
It appears that dev jobs which get preempted often get unstable after
losing some executors and the whole jobs gets stuck (without making any
progress) or end up getting restarted (and hence losing all the work done).
Has someone encountered this before ? Is the solution just to set
spark.task.maxFailures
to a really high value to recover from task failures in such scenarios? Are
there other approaches that people have taken for spark multi tenancy that
works better in such scenario?

Thanks,
Sadhan


Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Attaching log for when the dev job gets stuck (once all its executors are
lost due to preemption). This is a spark-shell job running in yarn-client
mode.

On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood  wrote:

> Hi All,
>
> We've set up our spark cluster on aws running on yarn (running on hadoop
> 2.3) with fair scheduling and preemption turned on. The cluster is shared
> for prod and dev work where prod runs with a higher fair share and can
> preempt dev jobs if there are not enough resources available for it.
> It appears that dev jobs which get preempted often get unstable after
> losing some executors and the whole jobs gets stuck (without making any
> progress) or end up getting restarted (and hence losing all the work done).
> Has someone encountered this before ? Is the solution just to set 
> spark.task.maxFailures
> to a really high value to recover from task failures in such scenarios? Are
> there other approaches that people have taken for spark multi tenancy that
> works better in such scenario?
>
> Thanks,
> Sadhan
>


spark_job_stuck.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Interestingly, if there is nothing running on dev spark-shell, it recovers
successfully and regains the lost executors. Attaching the log for that.
Notice, the "Registering block manager .." statements in the very end after
all executors were lost.

On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood  wrote:

> Attaching log for when the dev job gets stuck (once all its executors are
> lost due to preemption). This is a spark-shell job running in yarn-client
> mode.
>
> On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood 
> wrote:
>
>> Hi All,
>>
>> We've set up our spark cluster on aws running on yarn (running on hadoop
>> 2.3) with fair scheduling and preemption turned on. The cluster is shared
>> for prod and dev work where prod runs with a higher fair share and can
>> preempt dev jobs if there are not enough resources available for it.
>> It appears that dev jobs which get preempted often get unstable after
>> losing some executors and the whole jobs gets stuck (without making any
>> progress) or end up getting restarted (and hence losing all the work done).
>> Has someone encountered this before ? Is the solution just to set 
>> spark.task.maxFailures
>> to a really high value to recover from task failures in such scenarios? Are
>> there other approaches that people have taken for spark multi tenancy that
>> works better in such scenario?
>>
>> Thanks,
>> Sadhan
>>
>
>


spark_job_recovers.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

SPARK-SQL parameter tuning for performance

2015-09-17 Thread Sadhan Sood
Hi Spark users,

We are running Spark on Yarn and often query table partitions as big as
100~200 GB from hdfs. Hdfs is co-located on the same cluster on which Spark
and Yarn run. I've noticed a much higher I/O read rates when I increase the
number of  executors cores from 2 to 8( Most tasks run in RACK_LOCAL and
few in NODE_LOCAL) while keeping the #executors constant. The ram on my
executor is around 24G. But the problem is that any subsequent shuffle
stage starts failing if I do that. It runs fine if i leave the number of
executors to 2 but then the read is much slower. The errors which I get
from Yarn is

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 1
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)


It seem to indicate Yarn is killing executors for using too much memory but
I can't be sure. I tried increasing the spark sql shuffle partitions as
well but that didn't help much. Is there a way we can run more partition
read tasks per executor but keep the #shuffle tasks still constant.


Thanks


Re: Error when cache partitioned Parquet table

2015-01-26 Thread Sadhan Sood
Hi Xu-dong,

Thats probably because your table's partition path don't look like
hdfs://somepath/key=value/*.parquet. Spark is trying to extract the
partition key's value from the path while caching and hence the exception
is being thrown since it can't find one.

On Mon, Jan 26, 2015 at 10:45 AM, ZHENG, Xu-dong  wrote:

> Hi all,
>
> I meet below error when I cache a partitioned Parquet table. It seems
> that, Spark is trying to extract the partitioned key in the Parquet file,
> so it is not found. But other query could run successfully, even request
> the partitioned key. Is it a bug in SparkSQL? Is there any workaround for
> it? Thank you!
>
> java.util.NoSuchElementException: key not found: querydate
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.MapLike$class.apply(MapLike.scala:141)
>   at scala.collection.AbstractMap.apply(Map.scala:58)
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142)
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:142)
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:127)
>   at 
> org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:247)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:724)
>
> --
> 郑旭东
> ZHENG, Xu-dong
>
>


Fwd: how to find the sources for spark-project

2014-10-11 Thread Sadhan Sood
-- Forwarded message --
From: Sadhan Sood 
Date: Sat, Oct 11, 2014 at 10:26 AM
Subject: Re: how to find the sources for spark-project
To: Stephen Boesch 


Thanks, I still didn't find it - is it under some particular branch ? More
specifically, I am looking to modify the file: ParquetHiveSerDe.java which
is under this namespace:

org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java and is
being linked through this jar:



org.spark-project.hive

hive-exec

0.12.0



It seems these are modified versions of the same jar in org.apache.hive:



org.apache.hive

hive-exec

0.13.1



It'd be great if I can find the sources of this jar so I can modify it
locally and rebuild jar. Not sure if I can rebuild spark with hive-exec
artifact in org.apache.hive without breaking it.



On Fri, Oct 10, 2014 at 7:28 PM, Stephen Boesch  wrote:

> Git clone the spark projecthttps://github.com/apache/spark.git
>  the hive related sources are under sql/hive/src/main/scala
>
> 2014-10-10 16:24 GMT-07:00 sadhan :
>
> We have our own customization on top of parquet serde that we've been using
>> for hive. In order to make it work with spark-sql, we need to be able to
>> re-build spark with this. It'll be much easier to rebuild spark with this
>> patch once I can find the sources for org.spark-project.hive. Not sure
>> where
>> to find it ? This seems like the place where we need to put our patch.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-find-the-sources-for-spark-project-tp16187.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


persist table schema in spark-sql

2014-10-13 Thread Sadhan Sood
We want to persist table schema of parquet file so as to use spark-sql cli
on that table later on? Is it possible or is spark-sql cli only good for
tables in hive metastore ? We are reading parquet data using this example:

// Read in the parquet file created above.  Parquet files are
self-describing so the schema is preserved.// The result of loading a
Parquet file is also a SchemaRDD.val parquetFile =
sqlContext.parquetFile("people.parquet")
//Parquet files can also be registered as tables and then used in SQL
statements.parquetFile.registerTempTable("parquetFile")


read all parquet files in a directory in spark-sql

2014-10-13 Thread Sadhan Sood
How can we read all parquet files in a directory in spark-sql. We are
following this example which shows a way to read one file:

// Read in the parquet file created above.  Parquet files are
self-describing so the schema is preserved.// The result of loading a
Parquet file is also a SchemaRDD.val parquetFile =
sqlContext.parquetFile("people.parquet")
//Parquet files can also be registered as tables and then used in SQL
statements.parquetFile.registerTempTable("parquetFile")


Re: read all parquet files in a directory in spark-sql

2014-10-13 Thread Sadhan Sood
Thanks Nick, DB - that was helpful.

On Mon, Oct 13, 2014 at 5:44 PM, DB Tsai  wrote:

> For now, with SparkSPARK-3462 parquet pushdown for unionAll PR, you
> can do the following for unionAll schemaRDD.
>
>   val files = Array("hdfs://file1.parquet", "hdfs://file2.parquet",
> "hdfs://file3.parquet")
>   val rdds = paths.map(hc.parquetFile(_))
>
>   val unionedRDD = {
> var temp = rdds(0)
> for (i <- 1 until rdds.length) {
>   temp = temp.unionAll(rdds(i))
> }
> temp
>   }
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Mon, Oct 13, 2014 at 7:22 PM, Nicholas Chammas
>  wrote:
> > Right now I believe the only supported option is to pass a
> comma-delimited
> > list of paths.
> >
> > I've opened SPARK-3928: Support wildcard matches on Parquet files to
> request
> > this feature.
> >
> > Nick
> >
> > On Mon, Oct 13, 2014 at 12:21 PM, Sadhan Sood 
> wrote:
> >>
> >> How can we read all parquet files in a directory in spark-sql. We are
> >> following this example which shows a way to read one file:
> >>
> >> // Read in the parquet file created above.  Parquet files are
> >> self-describing so the schema is preserved.
> >> // The result of loading a Parquet file is also a SchemaRDD.
> >> val parquetFile = sqlContext.parquetFile("people.parquet")
> >>
> >> //Parquet files can also be registered as tables and then used in SQL
> >> statements.
> >> parquetFile.registerTempTable("parquetFile")
> >
> >
>


Sharing spark context across multiple spark sql cli initializations

2014-10-22 Thread Sadhan Sood
We want to run multiple instances of spark sql cli on our yarn cluster.
Each instance of the cli is to be used by a different user. This looks
non-optimal if each user brings up a different cli given how spark works on
yarn by running executor processes (and hence consuming resources) on
worker nodes for the lifetime of the application. So, the right way seems
like to use the same spark context shared across multiple initializations
and running just one spark sql application. Is the understanding correct ?
Is there a way to do it currently ? Seem like it needs some kind of thrift
interface hooked into the cli driver.


Re: Sharing spark context across multiple spark sql cli initializations

2014-10-23 Thread Sadhan Sood
Thanks Michael, you saved me a lot of time!

On Wed, Oct 22, 2014 at 6:04 PM, Michael Armbrust 
wrote:

> The JDBC server is what you are looking for:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbc-server
>
> On Wed, Oct 22, 2014 at 11:10 AM, Sadhan Sood 
> wrote:
>
>> We want to run multiple instances of spark sql cli on our yarn cluster.
>> Each instance of the cli is to be used by a different user. This looks
>> non-optimal if each user brings up a different cli given how spark works on
>> yarn by running executor processes (and hence consuming resources) on
>> worker nodes for the lifetime of the application. So, the right way seems
>> like to use the same spark context shared across multiple initializations
>> and running just one spark sql application. Is the understanding correct ?
>> Is there a way to do it currently ? Seem like it needs some kind of thrift
>> interface hooked into the cli driver.
>>
>
>


Job cancelled because SparkContext was shut down - failures!

2014-10-24 Thread Sadhan Sood
Hi,

Trying to run a query on spark-sql but it keeps failing with this error on
the cli ( we are running spark-sql on a yarn cluster):


org.apache.spark.SparkException: Job cancelled because SparkContext was
shut down
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:700)
  at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
  at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
  at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:699)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1405)
  at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
  at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1352)
  at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
  at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
  at akka.actor.ActorCell.terminate(ActorCell.scala:369)
  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
  at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

On the UI, I see there are connection failures in mapPartition stage:

java.net.SocketTimeoutException: Read timed out
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:150)
java.net.SocketInputStream.read(SocketInputStream.java:121)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703)
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)

sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534)

sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439)
org.apache.spark.util.Utils$.fetchFile(Utils.scala:362)

org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:331)

org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:329)

scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:329)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


Re: Job cancelled because SparkContext was shut down - failures!

2014-10-24 Thread Sadhan Sood
These seem like s3 connection errors for the table data. Wondering, since
we don't see that many failures on hive. I also set the spark.task.maxFailures
= 15.

On Fri, Oct 24, 2014 at 12:15 PM, Sadhan Sood  wrote:

> Hi,
>
> Trying to run a query on spark-sql but it keeps failing with this error on
> the cli ( we are running spark-sql on a yarn cluster):
>
>
> org.apache.spark.SparkException: Job cancelled because SparkContext was
> shut down
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:700)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>   at
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:699)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1405)
>   at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1352)
>   at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>   at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>   at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>   at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>   at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>   at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> On the UI, I see there are connection failures in mapPartition stage:
>
> java.net.SocketTimeoutException: Read timed out
> java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.read(SocketInputStream.java:150)
> java.net.SocketInputStream.read(SocketInputStream.java:121)
> java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
> java.io.BufferedInputStream.read(BufferedInputStream.java:345)
> sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703)
> sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
> 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534)
> 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439)
> org.apache.spark.util.Utils$.fetchFile(Utils.scala:362)
> 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:331)
> 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:329)
> 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:329)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162)
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>


Re: Is SparkSQL + JDBC server a good approach for caching?

2014-10-24 Thread Sadhan Sood
Is there a way to cache certain (or most latest) partitions of certain
tables ?

On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust 
wrote:

> It does have support for caching using either CACHE TABLE  or
> CACHE TABLE  AS SELECT 
>
> On Fri, Oct 24, 2014 at 1:05 AM, ankits  wrote:
>
>> I want to set up spark SQL to allow ad hoc querying over the last X days
>> of
>> processed data, where the data is processed through spark. This would also
>> have to cache data (in memory only), so the approach I was thinking of was
>> to build a layer that persists the appropriate RDDs and stores them in
>> memory.
>>
>> I see spark sql allows ad hoc querying through JDBC though I have never
>> used
>> that before. Will using JDBC offer any advantages (e.g does it have built
>> in
>> support for caching?) over rolling my own solution for this use case?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Is SparkSQL + JDBC server a good approach for caching?

2014-10-24 Thread Sadhan Sood
That works perfect. Thanks again Michael

On Fri, Oct 24, 2014 at 3:10 PM, Michael Armbrust 
wrote:

> It won't be transparent, but you can do so something like:
>
> CACHE TABLE newData AS SELECT * FROM allData WHERE date > "..."
>
> and then query newData.
>
> On Fri, Oct 24, 2014 at 12:06 PM, Sadhan Sood 
> wrote:
>
>> Is there a way to cache certain (or most latest) partitions of certain
>> tables ?
>>
>> On Fri, Oct 24, 2014 at 2:35 PM, Michael Armbrust > > wrote:
>>
>>> It does have support for caching using either CACHE TABLE  or
>>> CACHE TABLE  AS SELECT 
>>>
>>> On Fri, Oct 24, 2014 at 1:05 AM, ankits  wrote:
>>>
>>>> I want to set up spark SQL to allow ad hoc querying over the last X
>>>> days of
>>>> processed data, where the data is processed through spark. This would
>>>> also
>>>> have to cache data (in memory only), so the approach I was thinking of
>>>> was
>>>> to build a layer that persists the appropriate RDDs and stores them in
>>>> memory.
>>>>
>>>> I see spark sql allows ad hoc querying through JDBC though I have never
>>>> used
>>>> that before. Will using JDBC offer any advantages (e.g does it have
>>>> built in
>>>> support for caching?) over rolling my own solution for this use case?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-SparkSQL-JDBC-server-a-good-approach-for-caching-tp17196.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


thrift jdbc server probably running queries as hive query

2014-11-10 Thread Sadhan Sood
I was testing out the spark thrift jdbc server by running a simple query in
the beeline client. The spark itself is running on a yarn cluster.

However, when I run a query in beeline -> I see no running jobs in the
spark UI(completely empty) and the yarn UI seem to indicate that the
submitted query is being run as a map reduce job. This is probably also
being indicated from the spark logs but I am not completely sure:

2014-11-11 00:19:00,492 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1

2014-11-11 00:19:00,877 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

2014-11-11 00:19:04,152 INFO  ql.Context
(Context.java:getMRScratchDir(267)) - New scratch dir is
hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2

2014-11-11 00:19:04,425 INFO  Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication
is deprecated. Instead, use mapreduce.client.submit.file.replication

2014-11-11 00:19:04,516 INFO  client.RMProxy
(RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
at :8032

2014-11-11 00:19:04,607 INFO  client.RMProxy
(RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
at :8032

2014-11-11 00:19:04,639 WARN  mapreduce.JobSubmitter
(JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your
application with ToolRunner to remedy this

2014-11-11 00:00:08,806 INFO  input.FileInputFormat
(FileInputFormat.java:listStatus(287)) - Total input paths to process :
14912

2014-11-11 00:00:08,864 INFO  lzo.GPLNativeCodeLoader
(GPLNativeCodeLoader.java:(34)) - Loaded native gpl library

2014-11-11 00:00:08,866 INFO  lzo.LzoCodec (LzoCodec.java:(76)) -
Successfully loaded & initialized native-lzo library [hadoop-lzo rev
8e266e052e423af592871e2dfe09d54c03f6a0e8]

2014-11-11 00:00:09,873 INFO  input.CombineFileInputFormat
(CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node
allocation with : CompletedNodes: 1, size left: 194541317

2014-11-11 00:00:10,017 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:submitJobInternal(396)) - number of splits:615

2014-11-11 00:00:10,095 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:printTokens(479)) - Submitting tokens for job:
job_1414084656759_0115

2014-11-11 00:00:10,241 INFO  impl.YarnClientImpl
(YarnClientImpl.java:submitApplication(167)) - Submitted application
application_1414084656759_0115


It seems like the query is being run as a hive query instead of spark
query. The same query works fine when run from spark-sql cli.


Re: thrift jdbc server probably running queries as hive query

2014-11-11 Thread Sadhan Sood
Hi Cheng,

I made sure the only hive server running on the machine is
hivethriftserver2.

/usr/lib/jvm/default-java/bin/java -cp
/usr/lib/hadoop/lib/hadoop-lzo.jar::/mnt/sadhan/spark-3/sbin/../conf:/mnt/sadhan/spark-3/spark-assembly-1.2.0-SNAPSHOT-hadoop2.3.0-cdh5.0.2.jar:/etc/hadoop/conf
-Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master yarn
--jars reporting.jar spark-internal

The query I am running is a simple count(*): "select count(*) from Xyz
where date_prefix=20141031" and pretty sure it's submitting a map reduce
job based on the spark logs:

TakesRest=false

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

  set hive.exec.reducers.bytes.per.reducer=

In order to limit the maximum number of reducers:

  set hive.exec.reducers.max=

In order to set a constant number of reducers:

  set mapreduce.job.reduces=

14/11/11 16:23:17 INFO ql.Context: New scratch dir is
hdfs://fdsfdsfsdfsdf:9000/tmp/hive-ubuntu/hive_2014-11-11_16-23-17_333_5669798325805509526-2

Starting Job = job_1414084656759_0142, Tracking URL =
http://xxx:8100/proxy/application_1414084656759_0142/
<http://t.signauxdix.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XYg2zGvG-W8rBGxP1p8d-TW64zBkx56dS1Dd58vwq02?t=http%3A%2F%2Fec2-54-83-34-89.compute-1.amazonaws.com%3A8100%2Fproxy%2Fapplication_1414084656759_0142%2F&si=6222577584832512&pi=626685a9-b628-43cc-91a1-93636171ce77>

Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1414084656759_0142

On Mon, Nov 10, 2014 at 9:59 PM, Cheng Lian  wrote:

>  Hey Sadhan,
>
> I really don't think this is Spark log... Unlike Shark, Spark SQL doesn't
> even provide a Hive mode to let you execute queries against Hive. Would you
> please check whether there is an existing HiveServer2 running there? Spark
> SQL HiveThriftServer2 is just a Spark port of HiveServer2, and they share
> the same default listening port. I guess the Thrift server didn't start
> successfully because the HiveServer2 occupied the port, and your Beeline
> session was probably linked against HiveServer2.
>
> Cheng
>
>
> On 11/11/14 8:29 AM, Sadhan Sood wrote:
>
> I was testing out the spark thrift jdbc server by running a simple query
> in the beeline client. The spark itself is running on a yarn cluster.
>
> However, when I run a query in beeline -> I see no running jobs in the
> spark UI(completely empty) and the yarn UI seem to indicate that the
> submitted query is being run as a map reduce job. This is probably also
> being indicated from the spark logs but I am not completely sure:
>
>  2014-11-11 00:19:00,492 INFO  ql.Context
> (Context.java:getMRScratchDir(267)) - New scratch dir is
> hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-1
>
> 2014-11-11 00:19:00,877 INFO  ql.Context
> (Context.java:getMRScratchDir(267)) - New scratch dir is
> hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2
>
> 2014-11-11 00:19:04,152 INFO  ql.Context
> (Context.java:getMRScratchDir(267)) - New scratch dir is
> hdfs://:9000/tmp/hive-ubuntu/hive_2014-11-11_00-19-00_367_3847629323646885865-2
>
> 2014-11-11 00:19:04,425 INFO  Configuration.deprecation
> (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.submit.replication
> is deprecated. Instead, use mapreduce.client.submit.file.replication
>
> 2014-11-11 00:19:04,516 INFO  client.RMProxy
> (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
> at :8032
>
> 2014-11-11 00:19:04,607 INFO  client.RMProxy
> (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager
> at :8032
>
> 2014-11-11 00:19:04,639 WARN  mapreduce.JobSubmitter
> (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option
> parsing not performed. Implement the Tool interface and execute your
> application with ToolRunner to remedy this
>
> 2014-11-11 00:00:08,806 INFO  input.FileInputFormat
> (FileInputFormat.java:listStatus(287)) - Total input paths to process :
> 14912
>
> 2014-11-11 00:00:08,864 INFO  lzo.GPLNativeCodeLoader
> (GPLNativeCodeLoader.java:(34)) - Loaded native gpl library
>
> 2014-11-11 00:00:08,866 INFO  lzo.LzoCodec (LzoCodec.java:(76)) -
> Successfully loaded & initialized native-lzo library [hadoop-lzo rev
> 8e266e052e423af592871e2dfe09d54c03f6a0e8]
>
> 2014-11-11 00:00:09,873 INFO  input.CombineFileInputFormat
> (CombineFileInputFormat.java:createSplits(413)) - DEBUG: Terminated node
> allocation with : CompletedNodes: 1, size left: 194541317
>
> 2014-11-11 00:00:10,017 INFO  mapreduce.JobSubmitter
> (JobSubmitt

Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
We are running spark on yarn with combined memory > 1TB and when trying to
cache a table partition(which is < 100G), seeing a lot of failed collect
stages in the UI and this never succeeds. Because of the failed collect, it
seems like the mapPartitions keep getting resubmitted. We have more than
enough memory so its surprising we are seeing this issue. Can someone
please help. Thanks!

The stack trace of the failed collect from UI is:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
oadcast at
DAGScheduler.scala:838

2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:11:15,482 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated

2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372

2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)




On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood  wrote:

> We are running spark on yarn with combined memory > 1TB and when trying to
> cache a table partition(which is < 100G), seeing a lot of failed collect
> stages in the UI and this never succeeds. Because of the failed collect, it
> seems like the mapPartitions keep getting resubmitted. We have more than
> enough memory so its surprising we are seeing this issue. Can someone
> please help. Thanks!
>
> The stack trace of the failed collect from UI is:
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
> location for shuffle 0
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>   at 
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>   at 
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>   at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTa

Re: Building spark targz

2014-11-12 Thread Sadhan Sood
Just making sure but are you looking for the tar in assembly/target dir ?

On Wed, Nov 12, 2014 at 3:14 PM, Ashwin Shankar 
wrote:

> Hi,
> I just cloned spark from the github and I'm trying to build to generate a
> tar ball.
> I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
> -DskipTests clean package
>
> Although the build is successful, I don't see the targz generated.
>
> Am I running the wrong command ?
>
> --
> Thanks,
> Ashwin
>
>
>


Re: Building spark targz

2014-11-12 Thread Sadhan Sood
I think you can provide -Pbigtop-dist to build the tar.

On Wed, Nov 12, 2014 at 3:21 PM, Sean Owen  wrote:

> mvn package doesn't make tarballs. It creates artifacts that will
> generally appear in target/ and subdirectories, and likewise within
> modules. Look at make-distribution.sh
>
> On Wed, Nov 12, 2014 at 8:14 PM, Ashwin Shankar  > wrote:
>
>> Hi,
>> I just cloned spark from the github and I'm trying to build to generate a
>> tar ball.
>> I'm doing : mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
>> -DskipTests clean package
>>
>> Although the build is successful, I don't see the targz generated.
>>
>> Am I running the wrong command ?
>>
>> --
>> Thanks,
>> Ashwin
>>
>>
>>
>


Re: Too many failed collects when trying to cache a table in SparkSQL

2014-11-12 Thread Sadhan Sood
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:

rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0

The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.

On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood  wrote:

> This is the log output:
>
> 2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
> (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
> SELECT * FROM xyz where date_prefix = 20141112'
>
> 2014-11-12 19:07:17,455 INFO  Configuration.deprecation
> (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
> deprecated. Instead, use mapreduce.job.maps
>
> 2014-11-12 19:07:17,756 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
> TableReader.scala:68
>
> 2014-11-12 19:07:18,292 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
>
> 2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
> (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
>
> 2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
> Exchange.scala:86)
>
> 2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
> with 1 output partitions (allowLocal=false)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
> SparkPlan.scala:84)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
>
> 2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
>
> 2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:07:22,916 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 161.113 s
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:10:04,112 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 52 on
> ip-10-61-175-167.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 52
>
> 2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
>
> 2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.sca

Cache sparkSql data without uncompressing it in memory

2014-11-12 Thread Sadhan Sood
We noticed while caching data from our hive tables which contain data in
compressed sequence file format that it gets uncompressed in memory when
getting cached. Is there a way to turn this off and cache the compressed
data as is ?


Re: Cache sparkSql data without uncompressing it in memory

2014-11-13 Thread Sadhan Sood
Thanks Chneg, Just one more question - does that mean that we still need
enough memory in the cluster to uncompress the data before it can be
compressed again or does that just read the raw data as is?

On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian  wrote:

>  Currently there’s no way to cache the compressed sequence file directly.
> Spark SQL uses in-memory columnar format while caching table rows, so we
> must read all the raw data and convert them into columnar format. However,
> you can enable in-memory columnar compression by setting
> spark.sql.inMemoryColumnarStorage.compressed to true. This property is
> already set to true by default in master branch and branch-1.2.
>
> On 11/13/14 7:16 AM, Sadhan Sood wrote:
>
>   We noticed while caching data from our hive tables which contain data
> in compressed sequence file format that it gets uncompressed in memory when
> getting cached. Is there a way to turn this off and cache the compressed
> data as is ?
>
>   ​
>


SparkSQL exception on cached parquet table

2014-11-14 Thread Sadhan Sood
While testing SparkSQL on a bunch of parquet files (basically used to be a
partition for one of our hive tables), I encountered this error:

import org.apache.spark.sql.SchemaRDD
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.registerTempTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- works
fine
sqlContext.cacheTable("xyz_20141109")
sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- fails
with an exception

parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in
file
hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet

at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)

at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)

at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ArrayIndexOutOfBoundsException


Re: Cache sparkSql data without uncompressing it in memory

2014-11-14 Thread Sadhan Sood
Thanks Cheng, that was helpful. I noticed from UI that only half of the
memory per executor was being used for caching, is that true? We have a 2
TB sequence file dataset that we wanted to cache in our cluster with ~ 5TB
memory but caching still failed and what looked like from the UI was that
it used 2.5 TB of memory and almost wrote 12 TB to disk (at which point it
was useless) during the mapPartition stage. Also, couldn't run more than 2
executors/box (60g memory/box) or else it died very quickly from lesser
memory/executor (not sure why?) although I/O seemed to be going much faster
which makes sense because of more parallel reads.

On Thu, Nov 13, 2014 at 10:50 PM, Cheng Lian  wrote:

>  No, the columnar buffer is built in a small batching manner, the batch
> size is controlled by the spark.sql.inMemoryColumnarStorage.batchSize
> property. The default value for this in master and branch-1.2 is 10,000
> rows per batch.
>
> On 11/14/14 1:27 AM, Sadhan Sood wrote:
>
>   Thanks Chneg, Just one more question - does that mean that we still
> need enough memory in the cluster to uncompress the data before it can be
> compressed again or does that just read the raw data as is?
>
> On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian 
> wrote:
>
>>  Currently there’s no way to cache the compressed sequence file
>> directly. Spark SQL uses in-memory columnar format while caching table
>> rows, so we must read all the raw data and convert them into columnar
>> format. However, you can enable in-memory columnar compression by setting
>> spark.sql.inMemoryColumnarStorage.compressed to true. This property is
>> already set to true by default in master branch and branch-1.2.
>>
>> On 11/13/14 7:16 AM, Sadhan Sood wrote:
>>
>> We noticed while caching data from our hive tables which contain data in
>> compressed sequence file format that it gets uncompressed in memory when
>> getting cached. Is there a way to turn this off and cache the compressed
>> data as is ?
>>
>>  ​
>>
>
>​
>


Re: SparkSQL exception on cached parquet table

2014-11-16 Thread Sadhan Sood
Hi Cheng,

I tried reading the parquet file(on which we were getting the exception)
through parquet-tools and it is able to dump the file and I can read the
metadata, etc. I also loaded the file through hive table and can run a
table scan query on it as well. Let me know if I can do more to help
resolve the problem, I'll run it through a debugger and see if I can get
more information on it in the meantime.

Thanks,
Sadhan

On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian  wrote:

>  (Forgot to cc user mail list)
>
>
> On 11/16/14 4:59 PM, Cheng Lian wrote:
>
> Hey Sadhan,
>
>  Thanks for the additional information, this is helpful. Seems that some
> Parquet internal contract was broken, but I'm not sure whether it's caused
> by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
> somehow. I'm investigating this. In the meanwhile, would you mind to help
> to narrow down the problem by trying to scan exactly the same Parquet file
> with some other systems (e.g. Hive or Impala)? If other systems work, then
> there must be something wrong with Spark SQL.
>
>  Cheng
>
> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood 
> wrote:
>
>> Hi Cheng,
>>
>>  Thanks for your response. Here is the stack trace from yarn logs:
>>
>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>> at 
>> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>> at 
>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>> at 
>> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>> at 
>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>> at 
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>> ... 26 more
>>
>>
>> On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian 
>> wrote:
>>
>>>  Hi Sadhan,
>>>
>>> Could you please provide the stack trace of the
>>> ArrayIndexOutOfBoundsException (if any)? The reason why the first query
>>> succeeds is that Spark SQL doesn’t bother reading all data from the table
>>> to give COUNT(*). In the second case, however, the whole table is asked
>>> to be cached lazily via the cacheTable call, thus it’s scanned to build
>>> the in-memory columnar cache. Then thing went wrong while scanning this LZO
>>> compressed Parquet file. But unfortunately the stack trace at hand doesn’t
>>> indicate the root cause.
>>>
>>> Cheng
>>>
>>> On 11/15/14 5:28 AM, Sadhan Sood wrote:
>>>
>>> While testing SparkSQL on a bunch of parquet files (basically used to be
>>> a partition for one of our hive tables), I encountered this error:
>>>
>>>  import org.apache.spark.sql.SchemaRDD
>>> import org.apache.hadoop.fs.FileSystem;
>>> import org.apache.hadoop.conf.Configuration;
>>> import org.apache.hadoop.fs.Path;
>>>
>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>
>>>  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>> parquetFileRDD.registerTempTable("xyz_20141109")
>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- works
>>> fine
>>> sqlContext.cacheTable("xyz_20141109")
>>> sqlContext.sql("SELECT count(*)  FROM xyz_20141109").collect() <-- fails
>>> with an exception
>>>
>>>   parquet.io.ParquetDecodingException: Can not read value at 0 in block
>>> -1 in file
>>> hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet
>>>
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>
>>> at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>
>>&g

Exception in spark sql when running a group by query

2014-11-17 Thread Sadhan Sood
While testing sparkSQL, we were running this group by with expression query
and got an exception. The same query worked fine on hive.

SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd') as pst_date,
count(*) as num_xyzs
  FROM
all_matched_abc
  GROUP BY
from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd')

14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT
from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd') as pst_date,
count(*) as num_xyzs
  FROM
all_matched_abc
  GROUP BY
from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
  '/MM/dd')
]
org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Expression not in GROUP BY:
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200,
DoubleType))),/MM/dd) AS pst_date#179, tree:

Aggregate 
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived,
DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)],
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200,
DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L]

 MetastoreRelation default, all_matched_abc, None
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at 
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
at 
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:425)
at 
org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:59)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:276)
at org.apache.hadoop.hive.cli.CliDriver.pro

Re: Exception in spark sql when running a group by query

2014-11-18 Thread Sadhan Sood
ah makes sense - Thanks Michael!

On Mon, Nov 17, 2014 at 6:08 PM, Michael Armbrust 
wrote:

> You are perhaps hitting an issue that was fixed by #3248
> <https://github.com/apache/spark/pull/3248>?
>
> On Mon, Nov 17, 2014 at 9:58 AM, Sadhan Sood 
> wrote:
>
>> While testing sparkSQL, we were running this group by with expression
>> query and got an exception. The same query worked fine on hive.
>>
>> SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
>>   '/MM/dd') as pst_date,
>> count(*) as num_xyzs
>>   FROM
>> all_matched_abc
>>   GROUP BY
>> from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
>>   '/MM/dd')
>>
>> 14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT 
>> from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
>>   '/MM/dd') as pst_date,
>> count(*) as num_xyzs
>>   FROM
>> all_matched_abc
>>   GROUP BY
>> from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200),
>>   '/MM/dd')
>> ]
>> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression 
>> not in GROUP BY: 
>> HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
>>  AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, 
>> DoubleType))),/MM/dd) AS pst_date#179, tree:
>>
>> Aggregate 
>> [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived,
>>  DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)], 
>> [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived
>>  AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, 
>> DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L]
>>
>>  MetastoreRelation default, all_matched_abc, None
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125)
>> at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113)
>> at 
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
>> at 
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
>> at 
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> at 
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>> at 
>> scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
>> at 
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
>> at 
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at 
>> org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
>> at 
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
>> at 
>> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
>> at 
>> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
&

Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
s the file dump using parquet-tools:

row group 0

col_a:   BINARY UNCOMPRESSED DO:0 FPO:4 SZ:89/89/1.00 VC:9 ENC [more]...
col_b:   BINARY UNCOMPRESSED DO:0 FPO:93 SZ:89/89/1.00 VC:9 EN [more]...
col_c:   BINARY UNCOMPRESSED DO:0 FPO:182 SZ:89/89/1.00 VC:9 E [more]...
col_d:
.map:
..key:   BINARY UNCOMPRESSED DO:0 FPO:271 SZ:29/29/1.00 VC:9 E [more]...
..value:
...value_tuple:  BINARY UNCOMPRESSED DO:0 FPO:300 SZ:29/29/1.00 VC:9 E [more]...

col_a TV=9 RL=0 DL=1

page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

col_b TV=9 RL=0 DL=1

page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

col_c TV=9 RL=0 DL=1

page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:60 VC:9

col_d.map.key TV=9 RL=1 DL=2

page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9

col_d.map.value.value_tuple TV=9 RL=2 DL=4

page 0:  DLE:RLE RLE:RLE VLE:PLAIN SZ:12 VC:9

BINARY col_a

*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:a1
value 2: R:1 D:1 V:a2
value 3: R:1 D:1 V:a3
value 4: R:1 D:1 V:a4
value 5: R:1 D:1 V:a5
value 6: R:1 D:1 V:a6
value 7: R:1 D:1 V:a7
value 8: R:1 D:1 V:a8
value 9: R:1 D:1 V:a9

BINARY col_b

*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:b1
value 2: R:1 D:1 V:b2
value 3: R:1 D:1 V:b3
value 4: R:1 D:1 V:b4
value 5: R:1 D:1 V:b5
value 6: R:1 D:1 V:b6
value 7: R:1 D:1 V:b7
value 8: R:1 D:1 V:b8
value 9: R:1 D:1 V:b9

BINARY col_c

*** row group 1 of 1, values 1 to 9 ***
value 1: R:1 D:1 V:c1
value 2: R:1 D:1 V:c2
value 3: R:1 D:1 V:c3
value 4: R:1 D:1 V:c4
value 5: R:1 D:1 V:c5
value 6: R:1 D:1 V:c6
value 7: R:1 D:1 V:c7
value 8: R:1 D:1 V:c8
value 9: R:1 D:1 V:c9

BINARY col_d.map.key

*** row group 1 of 1, values 1 to 9 ***
value 1: R:0 D:0 V:
value 2: R:0 D:0 V:
value 3: R:0 D:0 V:
value 4: R:0 D:0 V:
value 5: R:0 D:0 V:
value 6: R:0 D:0 V:
value 7: R:0 D:0 V:
value 8: R:0 D:0 V:
value 9: R:0 D:0 V:

BINARY col_d.map.value.value_tuple

*** row group 1 of 1, values 1 to 9 ***
value 1: R:0 D:0 V:
value 2: R:0 D:0 V:
value 3: R:0 D:0 V:
value 4: R:0 D:0 V:
value 5: R:0 D:0 V:
value 6: R:0 D:0 V:
value 7: R:0 D:0 V:
value 8: R:0 D:0 V:
value 9: R:0 D:0 V:


I am happy to provide more information but any help is appreciated.


On Sun, Nov 16, 2014 at 7:40 PM, Sadhan Sood  wrote:

> Hi Cheng,
>
> I tried reading the parquet file(on which we were getting the exception)
> through parquet-tools and it is able to dump the file and I can read the
> metadata, etc. I also loaded the file through hive table and can run a
> table scan query on it as well. Let me know if I can do more to help
> resolve the problem, I'll run it through a debugger and see if I can get
> more information on it in the meantime.
>
> Thanks,
> Sadhan
>
> On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian  wrote:
>
>>  (Forgot to cc user mail list)
>>
>>
>> On 11/16/14 4:59 PM, Cheng Lian wrote:
>>
>> Hey Sadhan,
>>
>>  Thanks for the additional information, this is helpful. Seems that some
>> Parquet internal contract was broken, but I'm not sure whether it's caused
>> by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
>> somehow. I'm investigating this. In the meanwhile, would you mind to help
>> to narrow down the problem by trying to scan exactly the same Parquet file
>> with some other systems (e.g. Hive or Impala)? If other systems work, then
>> there must be something wrong with Spark SQL.
>>
>>  Cheng
>>
>> On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood 
>> wrote:
>>
>>> Hi Cheng,
>>>
>>>  Thanks for your response. Here is the stack trace from yarn logs:
>>>
>>>  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>> at java.util.ArrayList.elementData(ArrayList.java:418)
>>> at java.util.ArrayList.get(ArrayList.java:431)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>   

Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
Also attaching the parquet file if anyone wants to take a further look.

On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood  wrote:

> So, I am seeing this issue with spark sql throwing an exception when
> trying to read selective columns from a thrift parquet file and also when
> caching them:
> On some further digging, I was able to narrow it down to at-least one
> particular column type: map> to be causing this issue.
> To reproduce this I created a test thrift file with a very basic schema and
> stored some sample data in a parquet file:
>
> Test.thrift
> ===
> typedef binary SomeId
>
> enum SomeExclusionCause {
>   WHITELIST = 1,
>   HAS_PURCHASE = 2,
> }
>
> struct SampleThriftObject {
>   10: string col_a;
>   20: string col_b;
>   30: string col_c;
>   40: optional map> col_d;
> }
> =
>
> And loading the data in spark through schemaRDD:
>
> import org.apache.spark.sql.SchemaRDD
> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
> val parquetFile = "/path/to/generated/parquet/file"
> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
> parquetFileRDD.printSchema
> root
>  |-- col_a: string (nullable = true)
>  |-- col_b: string (nullable = true)
>  |-- col_c: string (nullable = true)
>  |-- col_d: map (nullable = true)
>  ||-- key: string
>  ||-- value: array (valueContainsNull = true)
>  |||-- element: string (containsNull = false)
>
> parquetFileRDD.registerTempTable("test")
> sqlContext.cacheTable("test")
> sqlContext.sql("select col_a from test").collect() <-- see the exception
> stack here
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
> value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
> at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
> at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
> at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
> at
> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:282)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
> at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
> at
> parquet.filter2.compat.FilterCompat$NoOpFil

Adding partitions to parquet data

2014-11-20 Thread Sadhan Sood
We are loading parquet data as temp tables but wondering if there is a way
to add a partition to the data without going through hive (we still want to
use spark's parquet serde as compared to hive). The data looks like ->

/date1/file1, /date1/file2 ... , /date2/file1,
/date2/file2,/daten/filem

and we are loading it like:
val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file
names)

but it would be nice to able to add a partition and provide date in the
query parameter.


Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
I am running on master, pulled yesterday I believe but saw the same issue
with 1.2.0

On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust 
wrote:

> Which version are you running on again?
>
> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood 
> wrote:
>
>> Also attaching the parquet file if anyone wants to take a further look.
>>
>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood 
>> wrote:
>>
>>> So, I am seeing this issue with spark sql throwing an exception when
>>> trying to read selective columns from a thrift parquet file and also when
>>> caching them:
>>> On some further digging, I was able to narrow it down to at-least one
>>> particular column type: map> to be causing this issue.
>>> To reproduce this I created a test thrift file with a very basic schema and
>>> stored some sample data in a parquet file:
>>>
>>> Test.thrift
>>> ===
>>> typedef binary SomeId
>>>
>>> enum SomeExclusionCause {
>>>   WHITELIST = 1,
>>>   HAS_PURCHASE = 2,
>>> }
>>>
>>> struct SampleThriftObject {
>>>   10: string col_a;
>>>   20: string col_b;
>>>   30: string col_c;
>>>   40: optional map> col_d;
>>> }
>>> =
>>>
>>> And loading the data in spark through schemaRDD:
>>>
>>> import org.apache.spark.sql.SchemaRDD
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>>> val parquetFile = "/path/to/generated/parquet/file"
>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>> parquetFileRDD.printSchema
>>> root
>>>  |-- col_a: string (nullable = true)
>>>  |-- col_b: string (nullable = true)
>>>  |-- col_c: string (nullable = true)
>>>  |-- col_d: map (nullable = true)
>>>  ||-- key: string
>>>  ||-- value: array (valueContainsNull = true)
>>>  |||-- element: string (containsNull = false)
>>>
>>> parquetFileRDD.registerTempTable("test")
>>> sqlContext.cacheTable("test")
>>> sqlContext.sql("select col_a from test").collect() <-- see the exception
>>> stack here
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read
>>> value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
>>> at
>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>> at
>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1223)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at org.apache.spark.execut

Re: Adding partitions to parquet data

2014-11-20 Thread Sadhan Sood
Ah awesome, thanks!!

On Thu, Nov 20, 2014 at 3:01 PM, Michael Armbrust 
wrote:

> In 1.2 by default we use Spark parquet support instead of Hive when the
> SerDe contains the word "Parquet".  This should work with hive partitioning.
>
> On Thu, Nov 20, 2014 at 10:33 AM, Sadhan Sood 
> wrote:
>
>> We are loading parquet data as temp tables but wondering if there is a
>> way to add a partition to the data without going through hive (we still
>> want to use spark's parquet serde as compared to hive). The data looks like
>> ->
>>
>> /date1/file1, /date1/file2 ... , /date2/file1,
>> /date2/file2,/daten/filem
>>
>> and we are loading it like:
>> val parquetFileRDD = sqlContext.parquetFile(comma separated parquet file
>> names)
>>
>> but it would be nice to able to add a partition and provide date in the
>> query parameter.
>>
>
>


Re: SparkSQL exception on cached parquet table

2014-11-20 Thread Sadhan Sood
Thanks Michael, opened this https://issues.apache.org/jira/browse/SPARK-4520

On Thu, Nov 20, 2014 at 2:59 PM, Michael Armbrust 
wrote:

> Can you open a JIRA?
>
> On Thu, Nov 20, 2014 at 10:39 AM, Sadhan Sood 
> wrote:
>
>> I am running on master, pulled yesterday I believe but saw the same issue
>> with 1.2.0
>>
>> On Thu, Nov 20, 2014 at 1:37 PM, Michael Armbrust > > wrote:
>>
>>> Which version are you running on again?
>>>
>>> On Thu, Nov 20, 2014 at 8:17 AM, Sadhan Sood 
>>> wrote:
>>>
>>>> Also attaching the parquet file if anyone wants to take a further look.
>>>>
>>>> On Thu, Nov 20, 2014 at 8:54 AM, Sadhan Sood 
>>>> wrote:
>>>>
>>>>> So, I am seeing this issue with spark sql throwing an exception when
>>>>> trying to read selective columns from a thrift parquet file and also when
>>>>> caching them:
>>>>> On some further digging, I was able to narrow it down to at-least one
>>>>> particular column type: map> to be causing this issue.
>>>>> To reproduce this I created a test thrift file with a very basic schema 
>>>>> and
>>>>> stored some sample data in a parquet file:
>>>>>
>>>>> Test.thrift
>>>>> ===
>>>>> typedef binary SomeId
>>>>>
>>>>> enum SomeExclusionCause {
>>>>>   WHITELIST = 1,
>>>>>   HAS_PURCHASE = 2,
>>>>> }
>>>>>
>>>>> struct SampleThriftObject {
>>>>>   10: string col_a;
>>>>>   20: string col_b;
>>>>>   30: string col_c;
>>>>>   40: optional map> col_d;
>>>>> }
>>>>> =
>>>>>
>>>>> And loading the data in spark through schemaRDD:
>>>>>
>>>>> import org.apache.spark.sql.SchemaRDD
>>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc);
>>>>> val parquetFile = "/path/to/generated/parquet/file"
>>>>> val parquetFileRDD = sqlContext.parquetFile(parquetFile)
>>>>> parquetFileRDD.printSchema
>>>>> root
>>>>>  |-- col_a: string (nullable = true)
>>>>>  |-- col_b: string (nullable = true)
>>>>>  |-- col_c: string (nullable = true)
>>>>>  |-- col_d: map (nullable = true)
>>>>>  ||-- key: string
>>>>>  ||-- value: array (valueContainsNull = true)
>>>>>  |||-- element: string (containsNull = false)
>>>>>
>>>>> parquetFileRDD.registerTempTable("test")
>>>>> sqlContext.cacheTable("test")
>>>>> sqlContext.sql("select col_a from test").collect() <-- see the
>>>>> exception stack here
>>>>>
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
>>>>> stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not
>>>>> read value at 0 in block -1 in file file:/tmp/xyz/part-r-0.parquet
>>>>> at
>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>> at
>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>> at
>>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>>>>> at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>> at
>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>> at
>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>> at
>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>> at scala.collection.TraversableOnce$class.to
>>>>> (TraversableOnce.scala:273)
>>>>> at

SparkSQL - can we add new column(s) to parquet files

2014-11-21 Thread Sadhan Sood
We create the table definition by reading the parquet file for schema and
store it in hive metastore. But if someone adds a new column to the schema,
and if we rescan the schema from the new parquet files and update the table
definition, would it still work if we run queries on the table ?

So, old table has -> Int a, Int b
new table -> Int a, Int b, String c

but older parquet files don't have String c, so on querying the table would
it return me null for column c  from older files and data from newer files
or fail?


does spark sql support columnar compression with encoding when caching tables

2014-12-18 Thread Sadhan Sood
Hi All,

Wondering if when caching a table backed by lzo compressed parquet data, if
spark also compresses it (using lzo/gzip/snappy) along with column level
encoding or just does the column level encoding when
"*spark.sql.inMemoryColumnarStorage.compressed"
*is set to true. This is because when I try to cache the data, I notice the
memory being used is almost as much as the uncompressed size of the data.

Thanks!


Re: does spark sql support columnar compression with encoding when caching tables

2014-12-19 Thread Sadhan Sood
Hey Michael,

Thank you for clarifying that. Is tachyon the right way to get compressed
data in memory or should we explore the option of adding compression to
cached data. This is because our uncompressed data set is too big to fit in
memory right now. I see the benefit of tachyon not just with storing
compressed data in memory but we wouldn't have to create a separate table
for caching some partitions like 'cache table table_cached as select * from
table where date = 201412XX' - the way we are doing right now.


On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust 
wrote:
>
> There is only column level encoding (run length encoding, delta encoding,
> dictionary encoding) and no generic compression.
>
> On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood 
> wrote:
>>
>> Hi All,
>>
>> Wondering if when caching a table backed by lzo compressed parquet data,
>> if spark also compresses it (using lzo/gzip/snappy) along with column level
>> encoding or just does the column level encoding when 
>> "*spark.sql.inMemoryColumnarStorage.compressed"
>> *is set to true. This is because when I try to cache the data, I notice
>> the memory being used is almost as much as the uncompressed size of the
>> data.
>>
>> Thanks!
>>
>


Re: does spark sql support columnar compression with encoding when caching tables

2014-12-19 Thread Sadhan Sood
Thanks Michael, that makes sense.

On Fri, Dec 19, 2014 at 3:13 PM, Michael Armbrust 
wrote:

> Yeah, tachyon does sound like a good option here.  Especially if you have
> nested data, its likely that parquet in tachyon will always be better
> supported.
>
> On Fri, Dec 19, 2014 at 2:17 PM, Sadhan Sood 
> wrote:
>>
>> Hey Michael,
>>
>> Thank you for clarifying that. Is tachyon the right way to get compressed
>> data in memory or should we explore the option of adding compression to
>> cached data. This is because our uncompressed data set is too big to fit in
>> memory right now. I see the benefit of tachyon not just with storing
>> compressed data in memory but we wouldn't have to create a separate table
>> for caching some partitions like 'cache table table_cached as select * from
>> table where date = 201412XX' - the way we are doing right now.
>>
>>
>> On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust > > wrote:
>>>
>>> There is only column level encoding (run length encoding, delta
>>> encoding, dictionary encoding) and no generic compression.
>>>
>>> On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood 
>>> wrote:
>>>>
>>>> Hi All,
>>>>
>>>> Wondering if when caching a table backed by lzo compressed parquet
>>>> data, if spark also compresses it (using lzo/gzip/snappy) along with column
>>>> level encoding or just does the column level encoding when 
>>>> "*spark.sql.inMemoryColumnarStorage.compressed"
>>>> *is set to true. This is because when I try to cache the data, I
>>>> notice the memory being used is almost as much as the uncompressed size of
>>>> the data.
>>>>
>>>> Thanks!
>>>>
>>>


Re: does spark sql support columnar compression with encoding when caching tables

2014-12-22 Thread Sadhan Sood
Thanks Cheng, Michael - that was super helpful.

On Sun, Dec 21, 2014 at 7:27 AM, Cheng Lian  wrote:

>  Would like to add that compression schemes built in in-memory columnar
> storage only supports primitive columns (int, string, etc.), complex types
> like array, map and struct are not supported.
>
>
> On 12/20/14 6:17 AM, Sadhan Sood wrote:
>
>  Hey Michael,
>
> Thank you for clarifying that. Is tachyon the right way to get compressed
> data in memory or should we explore the option of adding compression to
> cached data. This is because our uncompressed data set is too big to fit in
> memory right now. I see the benefit of tachyon not just with storing
> compressed data in memory but we wouldn't have to create a separate table
> for caching some partitions like 'cache table table_cached as select * from
> table where date = 201412XX' - the way we are doing right now.
>
>
> On Thu, Dec 18, 2014 at 6:46 PM, Michael Armbrust 
> wrote:
>>
>> There is only column level encoding (run length encoding, delta encoding,
>> dictionary encoding) and no generic compression.
>>
>> On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood 
>> wrote:
>>>
>>> Hi All,
>>>
>>>  Wondering if when caching a table backed by lzo compressed parquet
>>> data, if spark also compresses it (using lzo/gzip/snappy) along with column
>>> level encoding or just does the column level encoding when 
>>> "*spark.sql.inMemoryColumnarStorage.compressed"
>>> *is set to true. This is because when I try to cache the data, I notice
>>> the memory being used is almost as much as the uncompressed size of the
>>> data.
>>>
>>>  Thanks!
>>>
>>
>