Hello,
I could not find a user mailing list, please direct me to that if that is more
appropriate for this question.
I have a Map-Reduce action that is part of an Oozie workflow that reads Parquet
files from HDFS. The purpose of the action is to read each Parquet record and
output them as ‘|’ delimitated text. I am working with CDH 4, Parquet 1.6.0,
and the old apache.mapred API. Here is the error I am getting, it appears to
be occurring as mappers attempt to read the Parquet records and convert them to
Avro (as I am using AvroReadSupport):
2016-01-19 16:46:42,626 WARN org.apache.hadoop.mapred.Child: Error running child
parquet.io.ParquetDecodingException: Can not read value at 0 in block 0 in file
hdfs://nameservice1/group/foam/prototypes/parquet/ingest/201511/H788798/LDR/part-00023-r-00023.snappy.parquet
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
at
parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.<init>(DeprecatedParquetInputFormat.java:90)
at
parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:47)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: org.apache.avro.AvroRuntimeException: Bad index: 2
at org.apache.avro.mapred.Pair.put(Pair.java:155)
at
parquet.avro.AvroIndexedRecordConverter.set(AvroIndexedRecordConverter.java:116)
at
parquet.avro.AvroIndexedRecordConverter.access$000(AvroIndexedRecordConverter.java:38)
at
parquet.avro.AvroIndexedRecordConverter$1.add(AvroIndexedRecordConverter.java:76)
at
parquet.avro.AvroIndexedRecordConverter$FieldStringConverter.addBinary(AvroIndexedRecordConverter.java:252)
at
parquet.column.impl.ColumnReaderImpl$2$7.writeValue(ColumnReaderImpl.java:322)
at
parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:369)
at
parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:400)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:173)
What is this “Pair” schema that the program is trying to put values into? The
schema of my data has 13 fields, how do I provide it to the InputFormat or
ReadSupport class?
Also, should my mapper be written to work with Parquet Group values or Avro
GenericRecord values? Here is my Oozie config for the action:
<action name="PrepareCdrExportMR">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete
path="${nameNode}/group/foam/prototypes/parquet/ingest/${release}/${group}/CDR_EXPORT"/>
</prepare>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>/group/foam/prototypes/parquet/ingest/${release}/${group}/LDR</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${nameNode}/group/foam/prototypes/parquet/ingest/${release}/${group}/CDR_EXPORT</value>
</property>
<property>
<name>mapred.mapper.class</name>
<value>com.humedica.duplo.foam.CdrExportMapper</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>com.humedica.duplo.foam.CdrExportReducer</value>
</property>
<property>
<name>mapred.output.compress</name>
<value>false</value>
</property>
<property>
<name>mapred.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>
<property>
<name>mapred.input.format.class</name>
<value>parquet.hadoop.mapred.DeprecatedParquetInputFormat</value>
</property>
<property>
<name>mapred.output.format.class</name>
<value>org.apache.hadoop.mapred.TextOutputFormat</value>
</property>
<property>
<name>mapred.textoutputformat.separator</name>
<value>|</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>8</value>
</property>
<property>
<name>parquet.read.support.class</name>
<value>parquet.avro.AvroReadSupport</value>
</property>
<property>
<name>avro.schema</name>
<value>{
"namespace": "org.apache.avro.mapred",
"type": "record",
"name": "Foo",
"fields": [
{"name": "sourceid", "type": "string"},
{"name": "patientid", "type": "string"},
{"name": "encounterid", "type": "string"},
{"name": "encounterdate", "type": "string"},
{"name": "groupid", "type": "string"},
{"name": "cdsid", "type": "string"},
{"name": "seq", "type": "string"},
{"name": "section", "type": "string"},
{"name": "foam", "type": "string"},
{"name": "head", "type": "string"},
{"name": "body", "type": "string"},
{"name": "tail", "type": "string"},
{"name": "observationdate", "type": "string"},
{"name": "source", "type": "string"}
]
}
</value>
</property>
<property>
<name>fs.hdfs.impl.disable.cache</name>
<value>true</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.mapoutput.key.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapred.mapoutput.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization</value>
</property>
</configuration>
</map-reduce>
<ok to="HdfsToLocal"/> <!-- To HdfsToLocal-->
<error to="PrepareCdrExportFailureDBUpdate"/>
</action>