Hi All,

I am trying to read a orc file and  perform groupBy operation on it , but
When i run it on a large data set we are facing the following error
message.

Input format of INPUT DATA

|178111256|  107125374|
|178111256|  107148618|
|178111256|  107175361|
|178111256|  107189910|

and we are try to group by the first column.

But as per the logic and syntax the code is appropriate but it is  working
well on small data set. I have attached the code in the text file.

Thank you for your time.

ERROR MESSAGE:
Error: java.lang.ArrayIndexOutOfBoundsException at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1453)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1349)
at java.io.DataOutputStream.writeByte(DataOutputStream.java:153) at
org.apache.hadoop.io.WritableUtils.writeVLong(WritableUtils.java:273) at
org.apache.hadoop.io.WritableUtils.writeVInt(WritableUtils.java:253) at
org.apache.hadoop.io.Text.write(Text.java:330) at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
at
org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1149)
at
org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:610)
at orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:73) at
orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:39) at
org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:422) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)



-- 
REGARDS
BALAKUMAR SEETHARAMAN
package orc_groupby.orc_groupby;


import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Orc_groupBy extends Configured  implements Tool {
        
    public static int A_ID=0;
    public static int B_ID=1;

    public static class MyMapper<K , V extends Writable>
    extends MapReduceBase implements Mapper<K, OrcStruct, Text, Text> {

        private StructObjectInspector oip;
        private final OrcSerde serde = new OrcSerde();

        public void configure(JobConf job) {
            Properties table = new Properties();
            table.setProperty("columns", "viewedid,viewerid");
            table.setProperty("columns.types", "int,int");

            serde.initialize(job, table);

            try {
                oip = (StructObjectInspector) serde.getObjectInspector();
            } catch (SerDeException e) {
                e.printStackTrace();
            }
        }
        public void map(K key, OrcStruct val,
                OutputCollector<Text, Text> output, Reporter reporter)
                        throws IOException {
            List<? extends StructField> fields =oip.getAllStructFieldRefs();
            WritableIntObjectInspector bInspector =  
(WritableIntObjectInspector) fields.get(B_ID).getFieldObjectInspector();
            String a = "";
            String b = "";
            try {
                a =  bInspector.getPrimitiveJavaObject( 
oip.getStructFieldData(serde.deserialize(val), fields.get(A_ID))).toString();
                b =  
bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val),
 fields.get(B_ID))).toString();
                //System.out.print("A="+a+" B="+b);
                //System.exit(0);
            } catch (SerDeException e1) {
                e1.printStackTrace();
            }
            output.collect(new Text(a), new Text(b));
           
        }
    }


    public static class MyReducer<K, V extends Writable>
    extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output,
                Reporter reporter)
                        throws IOException {

            String toout = "";
            while (values.hasNext()) {
                String value = values.next().toString();
                toout += ","+value.toString();
            }
            output.collect(key, new Text(toout));
        }
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Orc_groupBy(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception {
        /*
        //remove output directory
        Runtime r = Runtime.getRuntime();
        Process p = r.exec("rm -rf "+args[1]);
        p.waitFor();
       
        BufferedReader b = new BufferedReader(new 
InputStreamReader(p.getInputStream()));
        String line = "";

        while ((line = b.readLine()) != null) {
            System.out.println(line);
        }
        b.close();
        */

        JobConf job = new JobConf(new Configuration(), Orc_groupBy.class);

        // Specify various job-specific parameters    
        job.setJobName("myjob");
       // job.set("mapreduce.framework.name","local");
       // job.set("fs.default.name","hdfs:///");
        job.set("log4j.logger.org.apache.hadoop","INFO");
        job.set("log4j.logger.org.apache.hadoop","INFO");
        
     /* job.set("mapreduce.map.sort.spill.percent","0.8");
        job.set("mapreduce.task.io.sort.factor","10");
        job.set("mapreduce.task.io.sort.mb","100");
        job.set("mapred.map.multithreadedrunner.threads","1");
        job.set("mapreduce.mapper.multithreadedmapper.threads","1");
        */

        //push down projection columns
        //job.set("hive.io.file.readcolumn.ids","0,1");
        //job.set("hive.io.file.read.all.columns","false");

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(Orc_groupBy.MyMapper.class);
        job.setReducerClass(Orc_groupBy.MyReducer.class);


        job.setInputFormat(OrcInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        JobClient.runJob(job);
        return 0;
    }

}

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to