Vladislav Kuzemchik created PARQUET-151:
-------------------------------------------
Summary: Null Pointer exception in
parquet.hadoop.ParquetFileWriter.mergeFooters
Key: PARQUET-151
URL: https://issues.apache.org/jira/browse/PARQUET-151
Project: Parquet
Issue Type: Bug
Reporter: Vladislav Kuzemchik
Hi!
I'm getting null pointer exception when I'm trying to write parquet files with
spark.
Dec 13, 2014 3:05:10 AM WARNING: parquet.hadoop.ParquetOutputCommitter: could
not write summary file for
hdfs://phoenix-011.nym1.placeiq.net:8020/user/vkuzemchik/parquet_data/1789
java.lang.NullPointerException
at
parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:426)
at
parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:402)
at
parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:51)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:936)
at
com.placeiq.spark.KafkaReader$.writeParquetHadoop(KafkaReader.scala:143)
at com.placeiq.spark.KafkaReader$$anonfun$3.apply(KafkaReader.scala:165)
at com.placeiq.spark.KafkaReader$$anonfun$3.apply(KafkaReader.scala:164)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Here is function I'm using:
def writeParquetHadoop(rdd:RDD[(Void,LogMessage)]):Unit = {
val jobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
val job = new Job(jobConf)
val outputDir =
"hdfs://phoenix-011.nym1.placeiq.net:8020/user/vkuzemchik/parquet_data/"
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
ParquetInputFormat.setReadSupportClass(job,
classOf[AvroReadSupport[LogMessage]])
AvroParquetInputFormat.setAvroReadSchema(job, LogMessage.SCHEMA$)
AvroParquetOutputFormat.setSchema(job, LogMessage.SCHEMA$)
ParquetOutputFormat.setCompression(job,CompressionCodecName.SNAPPY)
ParquetOutputFormat.setBlockSize(job, 536870912)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[LogMessage])
job.setOutputFormatClass(classOf[ParquetOutputFormat[LogMessage]])
job.getConfiguration.set("mapred.output.dir", outputDir+rdd.id)
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)