Hi,
After several days trying to figure out the problem I'm stuck with a class
cast exception when running a query with hive on spark on orc tables that I
updated with the streaming mutation api of hive 2.0.
The context is the following:
For hive:
The version is the latest available from the website 2.1
I created some scala code to insert data into an orc table with the
streaming mutation api followed the example provided somewhere in the hive
repository.
The table looks like that:
+--------------------------------------------------------------------+--+
| createtab_stmt |
+--------------------------------------------------------------------+--+
| CREATE TABLE `hc__member`( |
| `rdv_core__key` bigint, |
| `rdv_core__domainkey` string, |
| `rdftypes` array<string>, |
| `rdv_org__firstname` string, |
| `rdv_org__middlename` string, |
| `rdv_org__lastname` string, |
| `rdv_org__gender` string, |
| `rdv_org__city` string, |
| `rdv_org__state` string, |
| `rdv_org__countrycode` string, |
| `rdv_org__addresslabel` string, |
| `rdv_org__zip` string) |
| CLUSTERED BY ( |
| rdv_core__key) |
| INTO 24 BUCKETS |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' |
| STORED AS INPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' |
| LOCATION |
| 'hdfs://hmaster:8020/user/hive/warehouse/hc__member' |
| TBLPROPERTIES ( |
| 'COLUMN_STATS_ACCURATE'='{\"BASIC_STATS\":\"true\"}', |
| 'compactor.mapreduce.map.memory.mb'='2048', |
| 'compactorthreshold.hive.compactor.delta.num.threshold'='4', |
| 'compactorthreshold.hive.compactor.delta.pct.threshold'='0.5', |
| 'numFiles'='0', |
| 'numRows'='0', |
| 'rawDataSize'='0', |
| 'totalSize'='0', |
| 'transactional'='true', |
| 'transient_lastDdlTime'='1473792939') |
+--------------------------------------------------------------------+--+
The hive site looks like that:
<configuration>
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
<property>
<name>spark.master</name>
<value>spark://hmaster:7077</value>
</property>
<property>
<name>spark.eventLog.enabled</name>
<value>false</value>
</property>
<property>
<name>spark.executor.memory</name>
<value>12g</value>
</property>
<property>
<name>spark.serializer</name>
<value>org.apache.spark.serializer.KryoSerializer</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>750000000</value>
</property>
<property>
<name>hive.vectorized.execution.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.cbo.enable</name>
<value>true</value>
</property>
<property>
<name>hive.optimize.reducededuplication.min.reducer</name>
<value>4</value>
</property>
<property>
<name>hive.optimize.reducededuplication</name>
<value>true</value>
</property>
<property>
<name>hive.orc.splits.include.file.footer</name>
<value>false</value>
</property>
<property>
<name>hive.merge.mapfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.sparkfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.smallfiles.avgsize</name>
<value>16000000</value>
</property>
<property>
<name>hive.merge.size.per.task</name>
<value>256000000</value>
</property>
<property>
<name>hive.merge.orcfile.stripe.level</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>894435328</value>
</property>
<property>
<name>hive.optimize.bucketmapjoin.sortedmerge</name>
<value>false</value>
</property>
<property>
<name>hive.map.aggr.hash.percentmemory</name>
<value>0.5</value>
</property>
<property>
<name>hive.map.aggr</name>
<value>true</value>
</property>
<property>
<name>hive.optimize.sort.dynamic.partition</name>
<value>false</value>
</property>
<property>
<name>hive.stats.autogather</name>
<value>true</value>
</property>
<property>
<name>hive.stats.fetch.column.stats</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.execution.reduce.enabled</name>
<value>false</value>
</property>
<property>
<name>hive.vectorized.groupby.checkinterval</name>
<value>4096</value>
</property>
<property>
<name>hive.vectorized.groupby.flush.percent</name>
<value>0.1</value>
</property>
<property>
<name>hive.compute.query.using.stats</name>
<value>true</value>
</property>
<property>
<name>hive.limit.pushdown.memory.usage</name>
<value>0.4</value>
</property>
<property>
<name>hive.optimize.index.filter</name>
<value>true</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>67108864</value>
</property>
<property>
<name>hive.smbjoin.cache.rows</name>
<value>10000</value>
</property>
<property>
<name>hive.exec.orc.default.stripe.size</name>
<value>67108864</value>
</property>
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
</property>
<property>
<name>hive.fetch.task.conversion.threshold</name>
<value>1073741824</value>
</property>
<property>
<name>hive.fetch.task.aggr</name>
<value>false</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.list-status.num-threads</name>
<value>5</value>
</property>
<property>
<name>spark.kryo.referenceTracking</name>
<value>false</value>
</property>
<property>
<name>spark.kryo.classesToRegister</name>
<value>org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>NONE</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>4</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true</value>
<description>metadata is stored in a MySQL server</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>MySQL JDBC driver class</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hadoop</value>
<description>user name for connecting to mysql server</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value></value>
<description>password for connecting to mysql server</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
</property>
<property>
<name>hive.root.logger</name>
<value>WARN,RFA</value>
</property>
</configuration>
Whenever I run a query involving spark I go the following error:
java.io.IOException: java.io.IOException: error iterating
at
org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
at
org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
at
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:355)
at
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
at
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
at
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:116)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:246)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
at
org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList$ResultIterator.hasNext(HiveBaseFunctionResultList.java:93)
at
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: error iterating
at
org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowReader.next(VectorizedOrcAcidRowReader.java:92)
at
org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowReader.next(VectorizedOrcAcidRowReader.java:42)
at
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:350)
... 18 more
Caused by: java.lang.ClassCastException:
org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcListObjectInspector
cannot be cast to
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector
at
org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.setVector(VectorizedBatchUtil.java:311)
at
org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.acidAddRowToBatch(VectorizedBatchUtil.java:291)
at
org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowReader.next(VectorizedOrcAcidRowReader.java:82)
... 20 more
And what I mean by involving spark is that for a select * that does not run
against the spark backend I can see the data in the table but when I do a
query involving a group by for instance or just trying to extract a column
value, it runs through spark and I go this exception.
I also tried the Streaming API, with the same problem I tried a custom
writer and a jsonwriter.
I build myself the spark distribution removing hive related dependencies so
I don't think it comes from there.
Have you any recommendations on how I can proceed to find the root cause of
that problem ?
Thanks in advance.
PS: I made the mistake of posting on the dev mailing list earlier please
ignore it and sorry for the double post.
Regards,
Benjamin Schaff