For many operations, Spark SQL will just pass the data through without
looking at it.  Caching, in contrast, has to process the data so that we
can build up compressed column buffers.  So the schema is mismatched in
both cases, but only the caching case shows it.

Based on the exception, it looks more like there is a type mismatch (the
metastore is reporting an Integer, but the parquet data is actually
producing a String).

On Thu, Dec 11, 2014 at 6:38 AM, Yana Kadiyska <yana.kadiy...@gmail.com>
wrote:
>
> I see -- they are the same in design but  the difference comes from
> partitioned Hive tables: when the RDD is generated by querying an external
> Hive metastore, the partition is appended as part of the row, and shows up
> as part of the schema. Can you shed some light on why this is a problem:
>
> last2HourRdd.first <-- works ok
> last2HourRdd.cache()
>
> last2HourRdd.first <-- does not work
>
> ​
>
> The first call shows K+1 columns (and so does print schema, where K
> columns are from the backing parquet files and the K+1st is the partition
> inlined. My impression is that the second call to .first would just force
> the cache() call and dump out that RDD to disk (with all of it's K+1
> columns and store the schema info, again with K+1 columns), and then just
> return a single entry. I am not sure why the fact that Hive metastore
> exposes an extra column over the raw parquet file is a problem since it
> does so both on the schema and in the
> data: last2HourRdd.schema.fields.length reports K+1, and so does
>  last2HourRdd.first.length.
>
> I also tried
> calling sqlContext.applySchema(last2HourRdd,parquetFile.schema) before
> caching but it does not fix the issue. The only workaround I've come up
> with so far is to replace select * with a select <list_of_columns>. But I'd
> love to understand a little better why the cache call trips this scenario
>
>
>
> On Wed, Dec 10, 2014 at 3:50 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Have you checked to make sure the schema in the metastore matches the
>> schema in the parquet file?  One way to test would be to just use
>> sqlContext.parquetFile(...) which infers the schema from the file instead
>> of using the metastore.
>>
>> On Wed, Dec 10, 2014 at 12:46 PM, Yana Kadiyska <yana.kadiy...@gmail.com>
>> wrote:
>>
>>>
>>> Hi folks, wondering if anyone has thoughts. Trying to create something
>>> akin to a materialized view (sqlContext is a HiveContext connected to
>>> external metastore):
>>>
>>>
>>> val last2HourRdd = sqlContext.sql(s"select * from mytable")
>>> //last2HourRdd.first prints out a  org.apache.spark.sql.Row = [...] with
>>> valid data
>>>
>>>  last2HourRdd.cache()
>>> //last2HourRdd.first now fails in an executor with the following:
>>>
>>> In the driver:
>>>
>>> 14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0 (TID 
>>> 35, iphere, NODE_LOCAL, 2170 bytes)
>>> 14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0 (TID 35) 
>>> on executor iphere: java.lang.ClassCastException (null) [duplicate 1]
>>>
>>> ​
>>>
>>>
>>> And in executor:
>>>
>>> 14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 
>>> 27)
>>> java.lang.ClassCastException: java.lang.String cannot be cast to 
>>> java.lang.Integer
>>>     at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
>>>     at 
>>> org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73)
>>>     at 
>>> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231)
>>>     at 
>>> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236)
>>>     at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328)
>>>     at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310)
>>>     at 
>>> org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168)
>>>     at 
>>> org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37)
>>>     at 
>>> org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64)
>>>     at 
>>> org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54)
>>>     at 
>>> org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64)
>>>     at 
>>> org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52)
>>>     at 
>>> org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64)
>>>     at 
>>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
>>>     at 
>>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275)
>>>     at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>>>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>     at 
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>     at 
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>     at 
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>>     at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>     at 
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>     at 
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>     at 
>>> org.apache.spark.sql.execution.Limit$$anonfun$4.apply(basicOperators.scala:141)
>>>     at 
>>> org.apache.spark.sql.execution.Limit$$anonfun$4.apply(basicOperators.scala:141)
>>>     at 
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
>>>     at 
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>>
>>> Any thoughts on this? Not sure if using the external metastore for the
>>> inital pull is a problem or if I'm just hitting a bug.
>>>
>>>
>>>
>>
>

Reply via email to