Hi All, I am trying to read a orc file and perform groupBy operation on it , but When i run it on a large data set we are facing the following error message.
Input format of INPUT DATA |178111256| 107125374| |178111256| 107148618| |178111256| 107175361| |178111256| 107189910| and we are try to group by the first column. But as per the logic and syntax the code is appropriate but it is working well on small data set. I have attached the code in the text file. Thank you for your time. ERROR MESSAGE: Error: java.lang.ArrayIndexOutOfBoundsException at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1453) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1349) at java.io.DataOutputStream.writeByte(DataOutputStream.java:153) at org.apache.hadoop.io.WritableUtils.writeVLong(WritableUtils.java:273) at org.apache.hadoop.io.WritableUtils.writeVInt(WritableUtils.java:253) at org.apache.hadoop.io.Text.write(Text.java:330) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1149) at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:610) at orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:73) at orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:39) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164) -- REGARDS BALAKUMAR SEETHARAMAN
package orc_groupby.orc_groupby; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Orc_groupBy extends Configured implements Tool { public static int A_ID=0; public static int B_ID=1; public static class MyMapper<K , V extends Writable> extends MapReduceBase implements Mapper<K, OrcStruct, Text, Text> { private StructObjectInspector oip; private final OrcSerde serde = new OrcSerde(); public void configure(JobConf job) { Properties table = new Properties(); table.setProperty("columns", "viewedid,viewerid"); table.setProperty("columns.types", "int,int"); serde.initialize(job, table); try { oip = (StructObjectInspector) serde.getObjectInspector(); } catch (SerDeException e) { e.printStackTrace(); } } public void map(K key, OrcStruct val, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { List<? extends StructField> fields =oip.getAllStructFieldRefs(); WritableIntObjectInspector bInspector = (WritableIntObjectInspector) fields.get(B_ID).getFieldObjectInspector(); String a = ""; String b = ""; try { a = bInspector.getPrimitiveJavaObject( oip.getStructFieldData(serde.deserialize(val), fields.get(A_ID))).toString(); b = bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val), fields.get(B_ID))).toString(); //System.out.print("A="+a+" B="+b); //System.exit(0); } catch (SerDeException e1) { e1.printStackTrace(); } output.collect(new Text(a), new Text(b)); } } public static class MyReducer<K, V extends Writable> extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String toout = ""; while (values.hasNext()) { String value = values.next().toString(); toout += ","+value.toString(); } output.collect(key, new Text(toout)); } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Orc_groupBy(), args); System.exit(res); } public int run(String[] args) throws Exception { /* //remove output directory Runtime r = Runtime.getRuntime(); Process p = r.exec("rm -rf "+args[1]); p.waitFor(); BufferedReader b = new BufferedReader(new InputStreamReader(p.getInputStream())); String line = ""; while ((line = b.readLine()) != null) { System.out.println(line); } b.close(); */ JobConf job = new JobConf(new Configuration(), Orc_groupBy.class); // Specify various job-specific parameters job.setJobName("myjob"); // job.set("mapreduce.framework.name","local"); // job.set("fs.default.name","hdfs:///"); job.set("log4j.logger.org.apache.hadoop","INFO"); job.set("log4j.logger.org.apache.hadoop","INFO"); /* job.set("mapreduce.map.sort.spill.percent","0.8"); job.set("mapreduce.task.io.sort.factor","10"); job.set("mapreduce.task.io.sort.mb","100"); job.set("mapred.map.multithreadedrunner.threads","1"); job.set("mapreduce.mapper.multithreadedmapper.threads","1"); */ //push down projection columns //job.set("hive.io.file.readcolumn.ids","0,1"); //job.set("hive.io.file.read.all.columns","false"); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(Orc_groupBy.MyMapper.class); job.setReducerClass(Orc_groupBy.MyReducer.class); job.setInputFormat(OrcInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); JobClient.runJob(job); return 0; } }
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org