I have developed a customized InputReader to Read NetCDF format files in Hadoop, which you can see the code here:
package org.apache.hadoop.mapred; import java.io.IOException; import java.io.InputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.io.NetCDFArrayWritable; import java.util.List; import ucar.nc2.*; import ucar.nc2.iosp.*; import ucar.nc2.iosp.netcdf3.*; import ucar.unidata.io.*; import ucar.nc2.dataset.*; import ucar.ma2.Array; import ucar.ma2.ArrayFloat; import java.util.Arrays; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; public class NetCDFInputFormat extends FileInputFormat<Text, NetCDFArrayWritable> { private static final Log LOG = LogFactory.getLog(NetCDFInputFormat.class.getName()); private NetCDFInfo getNetCDFInfo(Path file, FileSystem fs, JobConf job) { //traverse header and return chunk start and size arrays NetCDFInfo result = new NetCDFInfo();//library call NetcdfFile ncFile; Variable v; ncFile = null; try { ncFile = NetcdfDataset.openFile(file.toString(), null); v = ncFile.findVariable("rsut"); //List<Variable> vs = ncFile.getVariables(); //v = vs.get(vs.size()-1); LOG.info("Variable is "+ v.getFullName()); result.fileSize = ncFile.vfileSize; result.recStart = ncFile.vrecStart; Long[] metaArray = v.reallyReadMeta().toArray(new Long[(int)(ncFile.vnumRecs)]); result.chunkStarts =ArrayUtils.toPrimitive(metaArray); //result.chunkSizes = nc.chunkSizes; result.numRecs = ncFile.vnumRecs; result.recSize = ncFile.vrecSize; result.smallRecSize = ncFile.vsmallRecSize; //result.shape = v.shape; } catch (Exception e) { LOG.info("Bad... "+ e); } try{if (ncFile!=null)ncFile.close();}catch (Exception e) {LOG.info("Bad2... "+e);} return result; } @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { FileStatus[] files = listStatus(job); LOG.info( "[SAMAN] beginning of getSplits" ); LOG.info( "[SAMAN] " + files.length ); // Save the number of input files in the job-conf job.setLong(NUM_INPUT_FILES, files.length); long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDir()) { throw new IOException("Not a file: "+ file.getPath()); } LOG.info ("[net] adding "+file.getPath()); totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); //long minSize = Math.max(job.getLong("mapred.min.split.size", 1), // minSplitSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job); long length = file.getLen(); LOG.info("get file len of "+file.getPath()); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(fs, path)) { long blockSize = file.getBlockSize(); NetCDFInfo netInfo = getNetCDFInfo(path, fs, job); long recStart = netInfo.recStart; long[] chunkSizes = netInfo.chunkSizes; long[] chunkStarts = netInfo.chunkStarts; long smallSize = netInfo.smallRecSize; long recSize = netInfo.recSize; long splitSize = 0; int chunkIndex = 0; long bytesRemaining = chunkStarts[chunkStarts.length-1] + recSize - recStart - 2*smallSize; long thisStart = recStart; //file position long thisChunk = 0; long blockNo = 1; while ( bytesRemaining > 0) { while ( chunkIndex < chunkStarts.length && chunkStarts[chunkIndex] < blockNo * blockSize ) { chunkIndex++; } long tempStart = thisStart; long endChunk; if (chunkIndex >= chunkStarts.length) { splitSize = chunkStarts[chunkStarts.length-1] + recSize - thisStart - smallSize; //bytesRemaining should be 0 after this round } else { splitSize = chunkStarts[chunkIndex] - thisStart - smallSize; thisStart = chunkStarts[chunkIndex]; } endChunk = chunkIndex; LOG.info("[net] split "+path+ " start "+tempStart+" size " +splitSize +" from chunk " +thisChunk +" to "+ endChunk); blockNo++; String[] splitHosts = getSplitHosts(blkLocations, tempStart, splitSize, clusterMap); FileSplit split = new FileSplit(path, tempStart, splitSize, splitHosts); split.getFileSplit().startChunk = thisChunk; split.getFileSplit().endChunk = endChunk; splits.add(split); bytesRemaining -= splitSize; LOG.info("[net] split " +path+" remaining "+bytesRemaining); thisChunk = endChunk; } } else if (length != 0) { String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); splits.add(new FileSplit(path, 0, length, splitHosts)); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(path, 0, length, new String[0])); } } LOG.info("Total # of splits: " + splits.size()); for (int i=0; i<splits.size(); i++) { FileSplit split = splits.get(i); LOG.info("split "+(i+1)+" "+ split.getStart()+ " "+split.getLength() + " " + split.getFileSplit().startChunk +" "+ split.getFileSplit().endChunk); } InputSplit[] test = splits.toArray(new FileSplit[splits.size()]); for( int i = 0; i < test.length; i++ ){ LOG.info( "[SAMAN] Final Split " + ((FileSplit)test[i]).getLength() + " " + ((FileSplit)test[i]).getPath() ); } return splits.toArray(new FileSplit[splits.size()]); } @Override public RecordReader<Text, NetCDFArrayWritable> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new NetCDFReaderWithCaching(job, (FileSplit) genericSplit); } } And also here is my record reader: package org.apache.hadoop.mapred; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.io.NetCDFArrayWritable; import java.util.List; import ucar.nc2.*; import ucar.nc2.iosp.*; import ucar.nc2.iosp.netcdf3.*; import ucar.unidata.io.*; import ucar.nc2.dataset.*; import ucar.ma2.Array; import ucar.ma2.ArrayFloat; import java.util.Arrays; /** * Treats keys as offset in file and value as array. */ public class NetCDFReaderWithCaching implements RecordReader<Text, NetCDFArrayWritable> { private static final Log LOG = LogFactory.getLog(NetCDFReaderWithCaching.class.getName()); private long start; private long pos; private long pos1D = 0; // 1D Index for dimensions [2:n] private long end; private int nDims; private NetcdfFile ncFile; private Variable v; private List<Dimension> dimensions; private long max1DSize = 1; // total number of elements could be stored in dimensions [2:n] private Text key = new Text(); private NetCDFArrayWritable value = new NetCDFArrayWritable(); private String sectionLocator = null; private Array chunk = null; private float[] my; private FloatWritable[] fw; private long previousTime = System.nanoTime(); public NetCDFReaderWithCaching( Configuration job, FileSplit split ) throws IOException { initialize( split ); } public void initialize(FileSplit genericSplit) throws IOException { FileSplit fSplit = (FileSplit) genericSplit; start = fSplit.getFileSplit().startChunk; //split.getStart(); end = fSplit.getFileSplit().endChunk; //start + split.getLength(); final Path file = fSplit.getPath(); LOG.info("Map is reading from input: " + file +" start chunk "+ start+" end chunk "+end); ncFile = NetcdfDataset.openFile(file.toString(), null); List<Variable> vs = ncFile.getVariables(); v = vs.get(vs.size()-1); LOG.info("Variable is "+ v.getFullName()); dimensions = v.getDimensions(); this.pos = start; StringBuilder sb = new StringBuilder(); for (int i=0; i<dimensions.size()-1; i++) { sb.append(",:"); } sectionLocator = sb.toString(); /* for ( int i=0; i < dimensions.size(); i++ ){ max1DSize *= dimensions.get(i).getLength(); } */ //max1DSize = chunk.getSize(); //LOG.info( "max1DSize is " + max1DSize ); } private void convert1DtoND( long OneDIndex, long[] NDIndex ){ for( int i = dimensions.size()-1; i >= 1; i-- ){ int temp = 1; for( int j = dimensions.size()-1; j > i; j-- ){ temp *= dimensions.get(j).getLength(); } temp = (int)(OneDIndex / temp); temp = temp % dimensions.get(i).getLength(); NDIndex[i-1] = temp; } } public Text createKey(){ return new Text(); } public NetCDFArrayWritable createValue(){ return new NetCDFArrayWritable(); } /** Read a line. */ public synchronized boolean next( Text key, NetCDFArrayWritable value ) throws IOException { long time1, time2, time3, time4, time5, time6; long[] NDIndex = new long[dimensions.size()-1]; time1 = System.nanoTime(); convert1DtoND( pos1D, NDIndex ); time2 = System.nanoTime(); String keyword = new String(); keyword += pos; for ( int i = 0; i < NDIndex.length; i++ ) keyword = keyword + "-" +NDIndex[i]; time3 = System.nanoTime(); time4 = System.nanoTime(); time5 = 0; LOG.info( "[SAMAN] position is " + pos + " and end is " + end ); if (pos < end) { key.set(keyword); time5=0; time4 = System.nanoTime(); if( pos1D == 0 ){ try{ LOG.info( "[SAMAN] rsut("+pos+":"+pos+sectionLocator+")" ); chunk = ncFile.readSection("rsut("+pos+":"+pos+sectionLocator+")"); time5 = System.nanoTime(); if (chunk == null) {LOG.info("chunk is null");return false;} LOG.info(chunk.getSize()+" elements and "+chunk.getSizeBytes()+" bytes, shape is "+Arrays.toString(chunk.getShape())); max1DSize = chunk.getSize(); my = (float[])chunk.get1DJavaArray(Float.class); fw = new FloatWritable[my.length]; for (int i=0; i< fw.length; i++) { fw[i]=new FloatWritable(my[i]); } } catch (ucar.ma2.InvalidRangeException e) { LOG.info("section error " + e); } } LOG.info("[YIQI] "+(System.nanoTime()-previousTime)); previousTime = System.nanoTime(); FloatWritable[] fwResult = new FloatWritable[dimensions.size()+1]; fwResult[0] = new FloatWritable((float)pos); for( int i = 1; i < dimensions.size(); i++ ){ fwResult[i] = new FloatWritable((float)(NDIndex[i-1])); } fwResult[dimensions.size()] = new FloatWritable( (float)my[(int)pos1D] ); // ADDED BY SAMAN String floatNumbers = new String(); //for( int i = 0; i < fw.length; i++ ){ // floatNumbers = floatNumbers + fw[i] + " "; //} //LOG.info( "[SAMAN] " + floatNumbers ); String location = new String(); location = location + pos + ":" + pos; for( int i = 0; i < NDIndex.length; i++ ){ location = location + "," + NDIndex[i] + ":" + NDIndex[i]; } ////LOG.info( "[SAMAN] location was (" + location + ")" ); value.set(fwResult); if( pos1D == max1DSize-1 ){ ////LOG.info( "[SAMAN] pos1D " + pos1D + " reached " + max1DSize ); pos++; pos1D=0; }else{ ////LOG.info( "[SAMAN] pos1D " + pos1D + " less than " + max1DSize ); pos1D++; } //pos ++; return true; } LOG.info("Reaching chunk end"); return false; } /** * Get the progress within the split */ public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } public synchronized long getPos() throws IOException { return pos; } public synchronized void close() throws IOException { if (ncFile != null) { ncFile.close(); } } } I have tested these classes with a sample Hadoop code and everything is working fine. I have tried to integrate this code with hive. As a result I have implemented a SerDe class as below: package org.apache.hadoop.hive.serde2; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.NetCDFArrayWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import java.io.CharArrayReader; import java.io.IOException; import java.io.Reader; import java.io.StringWriter; import java.io.Writer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * Created by saman on 7/31/15. */ public final class NetCDFSerde extends AbstractSerDe { public static final Log LOG = LogFactory.getLog(NetCDFSerde.class.getName()); private ObjectInspector inspector; private String[] outputFields; private int numCols; private List<Float> row; @Override public void initialize(final Configuration conf, final Properties tbl) throws SerDeException { System.out.println( "[Serde] initializing Serde!" ); final List<String> columnNames = Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS) .split(",")); numCols = columnNames.size(); final List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(numCols); for (int i = 0; i < numCols; i++) { columnOIs.add(PrimitiveObjectInspectorFactory.javaFloatObjectInspector); } inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); outputFields = new String[numCols]; row = new ArrayList<Float>(numCols); for (int i = 0; i < numCols; i++) { row.add(null); } } private char getProperty(final Properties tbl, final String property, final char def) { final String val = tbl.getProperty(property); if (val != null) { return val.charAt(0); } return def; } @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { System.out.println( "[Serde] We are in Serialize, which returns null." ); //return null; final StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; final List<? extends StructField> outputFieldRefs = outputRowOI.getAllStructFieldRefs(); if (outputFieldRefs.size() != numCols) { throw new SerDeException("Cannot serialize the object because there are " + outputFieldRefs.size() + " fields but the table has " + numCols + " columns."); } FloatWritable[] floatArray = new FloatWritable[numCols]; for (int c = 0; c < numCols; c++) { final Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c)); final ObjectInspector fieldOI = outputFieldRefs.get(c).getFieldObjectInspector(); final FloatObjectInspector fieldStringOI = (FloatObjectInspector) fieldOI; //outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field); System.out.println( "[Serde] " + (Float)(fieldStringOI.getPrimitiveJavaObject(field)) ); floatArray[c].set((Float)(fieldStringOI.getPrimitiveJavaObject(field))); } NetCDFArrayWritable result = new NetCDFArrayWritable(); result.set( floatArray ); return result; } @Override public Object deserialize(final Writable blob) throws SerDeException { FloatWritable[] records = (FloatWritable[])(((NetCDFArrayWritable)blob).toArray()); System.out.println( "[Serde] Im in deserialize" ); try{ for( int i = 0; i < numCols; i++ ){ System.out.println( "[Serde] I've got " + records[i].get() ); row.set( i, Float.valueOf( records[i].get() ) ); } return row; }catch ( final Exception e ){ throw new SerDeException(); } } @Override public ObjectInspector getObjectInspector() throws SerDeException { return inspector; } @Override public Class<? extends Writable> getSerializedClass() { return NetCDFArrayWritable.class; } @Override public SerDeStats getSerDeStats() { return null; } } Then I have tried this code to see if I can run Hive queries on the table I've created. When Hive doesn't create any mapreduce job, it works all fine, like `SELECT * FROM netcdf;` but when the query gets more complicated like `SELECT MAX(val) FROM netcdf;` and it creates map reduce jobs, I receive below error: Diagnostic Messages for this Task: Error: java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:179) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163) Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.exec.MapOperator.getNominalPath(MapOperator.java:399) at org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:459) at org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1071) at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:503) at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170) ... 8 more Printing all logs show me that my input reader works fine, but I don't know what is wrong with Hive that it cannot retrieve file paths in the Map Tasks. Could anybody help me with this issue?