I have developed a customized InputReader to Read NetCDF format files in
Hadoop, which you can see the code here:

    package org.apache.hadoop.mapred;

    import java.io.IOException;
    import java.io.InputStream;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.HashSet;
    import java.util.IdentityHashMap;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.commons.logging.LogFactory;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.mapreduce.security.TokenCache;
    import org.apache.hadoop.net.NetworkTopology;
    import org.apache.hadoop.net.Node;
    import org.apache.hadoop.net.NodeBase;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.commons.lang.ArrayUtils;
    import org.apache.hadoop.io.NetCDFArrayWritable;
    import java.util.List;
    import ucar.nc2.*;
    import ucar.nc2.iosp.*;
    import ucar.nc2.iosp.netcdf3.*;
    import ucar.unidata.io.*;
    import ucar.nc2.dataset.*;
    import ucar.ma2.Array;
    import ucar.ma2.ArrayFloat;
    import java.util.Arrays;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.RecordReader;
    public class NetCDFInputFormat extends FileInputFormat<Text,
NetCDFArrayWritable> {

      private static final Log LOG
        = LogFactory.getLog(NetCDFInputFormat.class.getName());


      private NetCDFInfo getNetCDFInfo(Path file, FileSystem fs, JobConf
job)
      {

        //traverse header and return chunk start and size arrays
        NetCDFInfo result = new NetCDFInfo();//library call

        NetcdfFile ncFile;
        Variable v;
        ncFile = null;
        try {
        ncFile = NetcdfDataset.openFile(file.toString(), null);

        v = ncFile.findVariable("rsut");
        //List<Variable> vs = ncFile.getVariables();
        //v = vs.get(vs.size()-1);

        LOG.info("Variable is "+ v.getFullName());
        result.fileSize = ncFile.vfileSize;
        result.recStart = ncFile.vrecStart;
        Long[] metaArray = v.reallyReadMeta().toArray(new
Long[(int)(ncFile.vnumRecs)]);
        result.chunkStarts =ArrayUtils.toPrimitive(metaArray);
        //result.chunkSizes = nc.chunkSizes;
        result.numRecs = ncFile.vnumRecs;
        result.recSize = ncFile.vrecSize;
        result.smallRecSize = ncFile.vsmallRecSize;
        //result.shape = v.shape;

        } catch (Exception e)

        {
            LOG.info("Bad... "+ e);
        }
        try{if (ncFile!=null)ncFile.close();}catch (Exception e)
{LOG.info("Bad2... "+e);}

        return result;
      }

      @Override
      public InputSplit[] getSplits(JobConf job, int numSplits)
        throws IOException {
        FileStatus[] files = listStatus(job);


        LOG.info( "[SAMAN] beginning of getSplits" );
        LOG.info( "[SAMAN] " + files.length );
        // Save the number of input files in the job-conf
        job.setLong(NUM_INPUT_FILES, files.length);
        long totalSize = 0;                           // compute total size
        for (FileStatus file: files) {                // check we have
valid files
          if (file.isDir()) {
            throw new IOException("Not a file: "+ file.getPath());
          }
          LOG.info ("[net] adding "+file.getPath());
          totalSize += file.getLen();
        }

        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
        //long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
         //                       minSplitSize);

        // generate splits
        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
        NetworkTopology clusterMap = new NetworkTopology();
        for (FileStatus file: files) {
          Path path = file.getPath();
          FileSystem fs = path.getFileSystem(job);
          long length = file.getLen();
          LOG.info("get file len of "+file.getPath());
          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
          if ((length != 0) && isSplitable(fs, path)) {
            long blockSize = file.getBlockSize();
            NetCDFInfo netInfo = getNetCDFInfo(path, fs, job);
            long recStart      = netInfo.recStart;
            long[] chunkSizes = netInfo.chunkSizes;
            long[] chunkStarts = netInfo.chunkStarts;
            long smallSize = netInfo.smallRecSize;
            long recSize = netInfo.recSize;
            long splitSize = 0;
            int chunkIndex = 0;
            long bytesRemaining = chunkStarts[chunkStarts.length-1] +
recSize - recStart - 2*smallSize;
            long thisStart = recStart;  //file position
            long thisChunk = 0;
            long blockNo = 1;
            while ( bytesRemaining > 0) {
              while ( chunkIndex < chunkStarts.length &&
chunkStarts[chunkIndex] < blockNo * blockSize ) {
                chunkIndex++;
              }
              long tempStart = thisStart;
              long endChunk;
              if (chunkIndex >= chunkStarts.length) {
                splitSize = chunkStarts[chunkStarts.length-1] + recSize -
thisStart - smallSize;

                //bytesRemaining should be 0 after this round
              }
              else {
                splitSize = chunkStarts[chunkIndex] - thisStart - smallSize;
                thisStart = chunkStarts[chunkIndex];
              }
              endChunk = chunkIndex;
              LOG.info("[net] split "+path+ " start "+tempStart+" size "
+splitSize +" from chunk " +thisChunk +" to "+ endChunk);
              blockNo++;

              String[] splitHosts = getSplitHosts(blkLocations, tempStart,
splitSize, clusterMap);
              FileSplit split = new FileSplit(path, tempStart, splitSize,
splitHosts);
              split.getFileSplit().startChunk = thisChunk;
              split.getFileSplit().endChunk = endChunk;
              splits.add(split);
              bytesRemaining -= splitSize;
              LOG.info("[net] split " +path+" remaining "+bytesRemaining);
              thisChunk = endChunk;
            }

          } else if (length != 0) {
            String[] splitHosts =
getSplitHosts(blkLocations,0,length,clusterMap);
            splits.add(new FileSplit(path, 0, length, splitHosts));
          } else {
            //Create empty hosts array for zero length files
            splits.add(new FileSplit(path, 0, length, new String[0]));
          }
        }
        LOG.info("Total # of splits: " + splits.size());
        for (int i=0; i<splits.size(); i++)
        {
          FileSplit split = splits.get(i);
          LOG.info("split "+(i+1)+" "+ split.getStart()+ "
"+split.getLength() + " " + split.getFileSplit().startChunk +" "+
split.getFileSplit().endChunk);
        }
        InputSplit[] test = splits.toArray(new FileSplit[splits.size()]);
        for( int i = 0; i < test.length; i++ ){
            LOG.info( "[SAMAN] Final Split " +
((FileSplit)test[i]).getLength() + " " + ((FileSplit)test[i]).getPath() );
        }
        return splits.toArray(new FileSplit[splits.size()]);
      }

     @Override
      public RecordReader<Text, NetCDFArrayWritable> getRecordReader(
                                              InputSplit genericSplit,
JobConf job,
                                              Reporter reporter)
        throws IOException {

        reporter.setStatus(genericSplit.toString());
        return new NetCDFReaderWithCaching(job, (FileSplit) genericSplit);
      }


    }


And also here is my record reader:

    package org.apache.hadoop.mapred;

    import java.io.IOException;
    import java.io.InputStream;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.ArrayWritable;
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.commons.logging.LogFactory;
    import org.apache.commons.logging.Log;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;

    import org.apache.hadoop.io.NetCDFArrayWritable;

    import java.util.List;
    import ucar.nc2.*;
    import ucar.nc2.iosp.*;
    import ucar.nc2.iosp.netcdf3.*;
    import ucar.unidata.io.*;
    import ucar.nc2.dataset.*;
    import ucar.ma2.Array;
    import ucar.ma2.ArrayFloat;
    import java.util.Arrays;
    /**
     * Treats keys as offset in file and value as array.
     */
    public class NetCDFReaderWithCaching implements RecordReader<Text,
NetCDFArrayWritable> {
      private static final Log LOG
        = LogFactory.getLog(NetCDFReaderWithCaching.class.getName());

      private long start;
      private long pos;
      private long pos1D = 0; // 1D Index for dimensions [2:n]
      private long end;
      private int nDims;
      private NetcdfFile ncFile;
      private Variable v;
      private List<Dimension> dimensions;
      private long max1DSize = 1; // total number of elements could be
stored in dimensions [2:n]
      private Text key = new Text();
      private NetCDFArrayWritable value = new NetCDFArrayWritable();
      private String sectionLocator = null;
      private Array chunk = null;
      private float[] my;
      private FloatWritable[] fw;
      private long previousTime = System.nanoTime();

      public NetCDFReaderWithCaching( Configuration job, FileSplit split )
throws IOException {

            initialize( split );
      }

      public void initialize(FileSplit genericSplit) throws IOException {
        FileSplit fSplit = (FileSplit) genericSplit;
        start = fSplit.getFileSplit().startChunk; //split.getStart();
        end = fSplit.getFileSplit().endChunk; //start + split.getLength();
        final Path file = fSplit.getPath();

        LOG.info("Map is reading from input: " + file +" start chunk "+
start+" end chunk "+end);

        ncFile = NetcdfDataset.openFile(file.toString(), null);
        List<Variable> vs = ncFile.getVariables();
        v = vs.get(vs.size()-1);
        LOG.info("Variable is "+ v.getFullName());
        dimensions = v.getDimensions();
        this.pos = start;
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<dimensions.size()-1; i++) {
            sb.append(",:");
        }
        sectionLocator =  sb.toString();
    /*
        for ( int i=0; i < dimensions.size(); i++ ){
            max1DSize *= dimensions.get(i).getLength();
        }
    */
        //max1DSize = chunk.getSize();
        //LOG.info( "max1DSize is " + max1DSize );
      }

      private void convert1DtoND( long OneDIndex, long[] NDIndex ){
            for( int i = dimensions.size()-1; i >= 1; i-- ){
                    int temp = 1;
                    for( int j = dimensions.size()-1; j > i; j-- ){
                            temp *= dimensions.get(j).getLength();
                    }
                    temp = (int)(OneDIndex / temp);
                    temp = temp % dimensions.get(i).getLength();
                    NDIndex[i-1] = temp;
            }
      }

      public Text createKey(){
            return new Text();
      }

      public NetCDFArrayWritable createValue(){
            return new NetCDFArrayWritable();
      }

      /** Read a line. */
      public synchronized boolean next( Text key, NetCDFArrayWritable value
)
        throws IOException {

        long time1, time2, time3, time4, time5, time6;
        long[] NDIndex = new long[dimensions.size()-1];
        time1 = System.nanoTime();
        convert1DtoND( pos1D, NDIndex );
        time2 = System.nanoTime();
        String keyword = new String();
        keyword += pos;
        for ( int i = 0; i < NDIndex.length; i++ )
            keyword = keyword + "-" +NDIndex[i];
        time3 = System.nanoTime();
        time4 = System.nanoTime();
        time5 = 0;
        LOG.info( "[SAMAN] position is " + pos + " and end is " + end );
        if (pos < end) {
          key.set(keyword);
          time5=0;
          time4 = System.nanoTime();
          if( pos1D == 0 ){
            try{
                    LOG.info( "[SAMAN]
rsut("+pos+":"+pos+sectionLocator+")" );
                    chunk =
ncFile.readSection("rsut("+pos+":"+pos+sectionLocator+")");
                    time5 = System.nanoTime();
                    if (chunk == null) {LOG.info("chunk is null");return
false;}
                    LOG.info(chunk.getSize()+" elements and
"+chunk.getSizeBytes()+" bytes, shape is
"+Arrays.toString(chunk.getShape()));
                    max1DSize = chunk.getSize();
                    my = (float[])chunk.get1DJavaArray(Float.class);
                    fw = new FloatWritable[my.length];
                    for (int i=0; i< fw.length; i++) {
                            fw[i]=new FloatWritable(my[i]);
                    }
            } catch (ucar.ma2.InvalidRangeException e)
            {
                    LOG.info("section error " + e);
            }
          }
          LOG.info("[YIQI] "+(System.nanoTime()-previousTime));
          previousTime = System.nanoTime();
          FloatWritable[] fwResult = new FloatWritable[dimensions.size()+1];
          fwResult[0] = new FloatWritable((float)pos);
          for( int i = 1; i < dimensions.size(); i++ ){
            fwResult[i] = new FloatWritable((float)(NDIndex[i-1]));
          }
          fwResult[dimensions.size()] = new FloatWritable(
(float)my[(int)pos1D] );
          // ADDED BY SAMAN
          String floatNumbers = new String();
          //for( int i = 0; i < fw.length; i++ ){
          //        floatNumbers = floatNumbers + fw[i] + " ";
          //}
          //LOG.info( "[SAMAN] " + floatNumbers );
          String location = new String();
          location = location + pos + ":" + pos;
          for( int i = 0; i < NDIndex.length; i++ ){
            location = location + "," + NDIndex[i] + ":" + NDIndex[i];
          }
          ////LOG.info( "[SAMAN] location was (" + location + ")"  );

          value.set(fwResult);
          if( pos1D == max1DSize-1 ){
            ////LOG.info( "[SAMAN] pos1D " + pos1D + " reached " +
max1DSize );
            pos++;
            pos1D=0;
          }else{
            ////LOG.info( "[SAMAN] pos1D " + pos1D + " less than " +
max1DSize );
            pos1D++;
          }
          //pos ++;
          return true;

        }
        LOG.info("Reaching chunk end");

        return false;
      }


      /**
       * Get the progress within the split
       */
      public float getProgress() {
        if (start == end) {
          return 0.0f;
        } else {
          return Math.min(1.0f, (pos - start) / (float)(end - start));
        }
      }

      public  synchronized long getPos() throws IOException {
        return pos;
      }

      public synchronized void close() throws IOException {
        if (ncFile != null) {
          ncFile.close();
        }
      }
    }

I have tested these classes with a sample Hadoop code and everything is
working fine. I have tried to integrate this code with hive. As a result I
have implemented a SerDe class as below:

    package org.apache.hadoop.hive.serde2;

    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hive.serde.serdeConstants;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    import
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
    import
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.NetCDFArrayWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;

    import java.io.CharArrayReader;
    import java.io.IOException;
    import java.io.Reader;
    import java.io.StringWriter;
    import java.io.Writer;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;

    /**
     * Created by saman on 7/31/15.
     */
    public final class NetCDFSerde extends AbstractSerDe {

        public static final Log LOG =
LogFactory.getLog(NetCDFSerde.class.getName());
        private ObjectInspector inspector;
        private String[] outputFields;
        private int numCols;
        private List<Float> row;

        @Override
        public void initialize(final Configuration conf, final Properties
tbl) throws SerDeException {

            System.out.println( "[Serde] initializing Serde!" );

            final List<String> columnNames =
Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS)
                    .split(","));

            numCols = columnNames.size();

            final List<ObjectInspector> columnOIs = new
ArrayList<ObjectInspector>(numCols);

            for (int i = 0; i < numCols; i++) {

columnOIs.add(PrimitiveObjectInspectorFactory.javaFloatObjectInspector);

            }

            inspector =
ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,
columnOIs);
            outputFields = new String[numCols];
            row = new ArrayList<Float>(numCols);

            for (int i = 0; i < numCols; i++) {
                row.add(null);
            }
        }

        private char getProperty(final Properties tbl, final String
property, final char def) {
            final String val = tbl.getProperty(property);

            if (val != null) {
                return val.charAt(0);
            }

            return def;
        }

        @Override
        public Writable serialize(Object obj, ObjectInspector objInspector)
throws SerDeException {
            System.out.println( "[Serde] We are in Serialize, which returns
null." );
            //return null;

            final StructObjectInspector outputRowOI =
(StructObjectInspector) objInspector;
            final List<? extends StructField> outputFieldRefs =
outputRowOI.getAllStructFieldRefs();

            if (outputFieldRefs.size() != numCols) {
                    throw new SerDeException("Cannot serialize the object
because there are "
                    + outputFieldRefs.size() + " fields but the table has "
+ numCols + " columns.");
            }

            FloatWritable[] floatArray = new FloatWritable[numCols];

            for (int c = 0; c < numCols; c++) {
                    final Object field =
outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c));
                    final ObjectInspector fieldOI =
outputFieldRefs.get(c).getFieldObjectInspector();

                    final FloatObjectInspector fieldStringOI =
(FloatObjectInspector) fieldOI;

                    //outputFields[c] =
fieldStringOI.getPrimitiveJavaObject(field);
                    System.out.println( "[Serde] " +
(Float)(fieldStringOI.getPrimitiveJavaObject(field)) );

floatArray[c].set((Float)(fieldStringOI.getPrimitiveJavaObject(field)));
            }

            NetCDFArrayWritable result = new NetCDFArrayWritable();
            result.set( floatArray );

            return result;
        }

        @Override
        public Object deserialize(final Writable blob) throws
SerDeException {
            FloatWritable[] records =
(FloatWritable[])(((NetCDFArrayWritable)blob).toArray());
            System.out.println( "[Serde] Im in deserialize" );

            try{
                for( int i = 0; i < numCols; i++ ){
                    System.out.println( "[Serde] I've got " +
records[i].get()  );
                    row.set( i, Float.valueOf( records[i].get() ) );
                }
                return row;
            }catch ( final Exception e ){
                throw new SerDeException();
            }
        }

        @Override
        public ObjectInspector getObjectInspector() throws SerDeException {
            return inspector;
        }

        @Override
        public Class<? extends Writable> getSerializedClass() {
            return NetCDFArrayWritable.class;
        }

        @Override
        public SerDeStats getSerDeStats() {
            return null;
        }
    }

Then I have tried this code to see if I can run Hive queries on the table
I've created. When Hive doesn't create any mapreduce job, it works all
fine, like `SELECT * FROM netcdf;` but when the query gets more complicated
like `SELECT MAX(val) FROM netcdf;` and it creates map reduce jobs, I
receive below error:

    Diagnostic Messages for this Task:
    Error: java.lang.RuntimeException: java.lang.NullPointerException
            at
org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:179)
            at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
            at
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
            at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
            at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:415)
            at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
            at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
    Caused by: java.lang.NullPointerException
            at
org.apache.hadoop.hive.ql.exec.MapOperator.getNominalPath(MapOperator.java:399)
            at
org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:459)
            at
org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1071)
            at
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:503)
            at
org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170)
            ... 8 more

Printing all logs show me that my input reader works fine, but I don't know
what is wrong with Hive that it cannot retrieve file paths in the Map Tasks.

Could anybody help me with this issue?

Reply via email to