Hi all,
I am trying to create a Crunch data source for a custom InputFormat that
has this structure CustomInputFormat<Void, CustomNonWritableClass>
Sorry for the long mail. I've tried two implementations with no success. I
must be missing something but not sure where?
1. Implementation: Derive PType<Pair<Void, CustomNonWritableClass>> using
MapWritable as base type
----------------------------------------------------------------------------------------------------------------------------------------
PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived(
(Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
null).getClass(),
new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {
public Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}
},
new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {
public MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}
},
typeFamily.records(MapWritable.class)
);
public class CustomDataSource extends FileTableSourceImpl<Void,
CustomNonWritableClass > {
public CustomDataSource() {
super(new Path("xsource"),
(PTableType<Void, CustomNonWritableClass >) derivedType),
FormatBundle.forInput(CustomInputFormat.class));
}
...
}
When I try this implementation it fails before submitting the job with the
following error:
Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.writable.WritableType cannot be cast to
org.apache.crunch.types.PTableType
at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)
2. Implementation: Derive PType<CustomNonWritableClass> using MapWritable
as base type
----------------------------------------------------------------------------------------------------------------------------------------
public static MapWritableToCustomNonWritableClass extends
MapFn<MapWritable, CustomNonWritableClass> {
public CustomNonWritableClass map(MapWritable input) {...}
}
public static CustomNonWritableClassToMapWritable
extends MapFn<CustomNonWritableClass, MapWritable>() {
public MapWritable map(CustomNonWritableClass input) {...}
}
PType<CustomNonWritableClass> derivedType = typeFamily.derived(
CustomNonWritableClass.class,
new MapWritableToCustomNonWritableClass(),
new CustomNonWritableClassToMapWritable(),
typeFamily.records(MapWritable.class)
);
public class CustomDataSource extends
FileSourceImpl<CustomNonWritableClass> {
public CustomDataSource() {
super(new Path("xsource"),
(PTableType<Void, CustomNonWritableClass >) derivedType),
FormatBundle.forInput(CustomInputFormat.class));
}
...
}
When run this gets submitted to the cluster, starts the MR job but
eventually fails with:
2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
attempt_1401786307497_0078_m_000000_0 - exited :
java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass
cannot be cast to org.apache.hadoop.io.MapWritable
at
com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
at org.apache.crunch.MapFn.process(MapFn.java:34)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Thanks,
Christian