I wrote few mails here regarding this issue.
After further investigation I think there is a bug in Spark 1.3 in saving
Hive tables.

(hc is HiveContext)

1. Verify the needed configuration exists:
scala> hc.sql("set hive.exec.compress.output").collect
res4: Array[org.apache.spark.sql.Row] =
Array([hive.exec.compress.output=true])
scala> hc.sql("set
mapreduce.output.fileoutputformat.compress.codec").collect
res5: Array[org.apache.spark.sql.Row] =
Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
scala> hc.sql("set mapreduce.output.fileoutputformat.compress.type").collect
res6: Array[org.apache.spark.sql.Row] =
Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
2. Loading DataFrame and save as table (path point to exists file):
val saDF = hc.parquetFile(path)
saDF.count

(count yield 229764 - i.e. the rdd exists)
saDF.saveAsTable("test_hive_ms")

Now for few interesting outputs:
1. Trying to query Hive CLI, the table exists but with wrong output format:
Failed with exception java.io.IOException:java.io.IOException: hdfs://
10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-00001.parquet
not a SequenceFile
2. Looking at the output files found that files are '.parquet' and not
'.snappy'
3. Looking at the saveAsTable output shows that it actually store the table
in both, wrong output format and without compression:
15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
createTime:1429687014, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array<string>,
comment:from deserializer)], location:null,
inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
parameters:{serialization.format=1, path=hdfs://
10.166.157.97:9000/user/hive/warehouse/test_hive_ms}), bucketCols:[],
sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[],
skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[],
parameters:{spark.sql.sources.schema.part.0={"type":"struct","fields":[{"name":"ADJDATE","type":"long","nullable":true,"metadata":{}},{"name":"sid","type":"integer","nullable":true,"metadata":{}},{"name":"ADJTYPE","type":"integer","nullable":true,"metadata":{}},{"name":"ENDADJDATE","type":"long","nullable":true,"metadata":{}},{"name":"ADJFACTOR","type":"double","nullable":true,"metadata":{}},{"name":"CUMADJFACTOR","type":"double","nullable":true,"metadata":{}}]},
EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
spark.sql.sources.provider=org.apache.spark.sql.parquet},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

So, the question is: do I miss some configuration here or should I open a
bug?

Thanks,
Ophir

Reply via email to