Hi,

In my Mapreduce job, I am using AvroKeyValueOutputFormat as one of my
MultipleOutputs. So I declared my multiple outputs as below.

 MultipleOutputs.addNamedOutput(stage2Job,
        SessionConstants.COMPLETED_SESSIONS,
        AvroKeyValueOutputFormat.class, AvroKey.class, AvroValue.class);

And In reducer, I am constructing and emitting GenericData.Record for the
below schema:

sessionSchema:
{"namespace": "ci.avro",
 "type": "record",
 "name": "Session",
 "fields": [
   {"name":"Common", "type": {
           "type": "map", "values":"string"}},
   {"name":"events",
    "type": {
        "type": "array",
    "items":{
    "name":"Event",
    "type":"map",
    "values":"string"}
    }
    }
 ]
}

eventSchema:
{"namespace": "ci.avro",
 "type": "record",
 "name": "AvroEvent",
 "fields": [
    {"name":"Event",
      "type": {
           "type": "map", "values":"string"
              }
    }
 ]
}

//record generation

 GenericData.Record record = new GenericData.Record(sessionSchema);
    GenericData.Record eRecord = new GenericData.Record(eventSchema);
    GenericData.Array<GenericData.Record> eventRecords =
        new GenericData.Array<GenericData.Record>(vc.getEvents().size(),
            sessionSchema.getField("events").schema());
    record.put("Common", vc.getCommon().getM_parameterMap());
    for (Event ev : vc.getEvents()) {
      eRecord = new GenericData.Record(eventSchema);
      eRecord.put("Event", ev.getM_parameterMap());
      eventRecords.add(eRecord);
    }
    record.put("events", eventRecords);

    sessionRecord.datum(record);


// record emmitted as below
context.getConfiguration().set(CONF_OUTPUT_KEY_SCHEMA,
          Schema.create(Schema.Type.STRING).toString());
      context.getConfiguration().set(CONF_OUTPUT_VALUE_SCHEMA,
          sessionSchema.toString());
      multipleOutputs.write(SessionConstants.COMPLETED_SESSIONS,
          new AvroKey<String>(key.toString()), sessionRecord,
          SessionConstants.COMPLETED_SESSIONS);

But I am getting below exception. I am also declaring
"avro.schema.output.value" as sessionSchema.toString().  What could be the
issue?

Exception:
org.apache.avro.file.DataFileWriter$AppendWriteException:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to java.util.Map
        at
org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
        at
org.apache.avro.mapreduce.AvroKeyValueRecordWriter.write(AvroKeyValueRecordWriter.java:127)
        at
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs$RecordWriterWithCounter.write(MultipleOutputs.java:304)
        at
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:370)
        at
com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:156)
        at
com.paypal.ci.CISuperSessionStage2AvroReducer.reduce(CISuperSessionStage2AvroReducer.java:24)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
        at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to java.util.Map
        at
org.apache.avro.generic.GenericDatumWriter.getMapSize(GenericDatumWriter.java:194)
        at
org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:173)
        at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:69)
        at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
        at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:138)
        at
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:64)
        at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
        at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
        at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
        at
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
        at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
        at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
        at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
        at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
        at
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
        at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
        at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
        at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
        at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at
org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)



Thanks & Regards,
B Anil Kumar.

Reply via email to