HIVE-20225: SerDe to support Teradata Binary Format (Lu Li via cws)

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8b16ad0f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8b16ad0f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8b16ad0f

Branch: refs/heads/branch-2
Commit: 8b16ad0f54d3bcf56362118f31e25a792ad7291e
Parents: bd32deb
Author: Daniel Dai <dai...@gmail.com>
Authored: Fri Aug 31 12:36:23 2018 -0700
Committer: Daniel Dai <dai...@gmail.com>
Committed: Fri Aug 31 12:36:23 2018 -0700

----------------------------------------------------------------------
 .../td_data_with_1mb_rowsize.teradata.gz        | Bin 0 -> 616 bytes
 .../teradata_binary_table.deflate               | Bin 0 -> 1329 bytes
 .../ql/io/TeradataBinaryFileInputFormat.java    |  66 ++
 .../ql/io/TeradataBinaryFileOutputFormat.java   | 112 ++++
 .../hive/ql/io/TeradataBinaryRecordReader.java  | 280 +++++++++
 .../teradata/TeradataBinaryDataInputStream.java | 200 +++++++
 .../TeradataBinaryDataOutputStream.java         | 270 +++++++++
 .../serde2/teradata/TeradataBinarySerde.java    | 597 +++++++++++++++++++
 .../TestTeradataBinarySerdeForDate.java         |  72 +++
 .../TestTeradataBinarySerdeForDecimal.java      | 106 ++++
 .../TestTeradataBinarySerdeForTimeStamp.java    |  96 +++
 .../TestTeradataBinarySerdeGeneral.java         | 126 ++++
 .../clientpositive/test_teradatabinaryfile.q    | 123 ++++
 .../test_teradatabinaryfile.q.out               | 537 +++++++++++++++++
 14 files changed, 2585 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz
----------------------------------------------------------------------
diff --git 
a/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz 
b/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz
new file mode 100644
index 0000000..7319e3c
Binary files /dev/null and 
b/data/files/teradata_binary_file/td_data_with_1mb_rowsize.teradata.gz differ

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/data/files/teradata_binary_file/teradata_binary_table.deflate
----------------------------------------------------------------------
diff --git a/data/files/teradata_binary_file/teradata_binary_table.deflate 
b/data/files/teradata_binary_file/teradata_binary_table.deflate
new file mode 100644
index 0000000..fd53dde
Binary files /dev/null and 
b/data/files/teradata_binary_file/teradata_binary_table.deflate differ

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java 
b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java
new file mode 100644
index 0000000..bed87c5
--- /dev/null
+++ b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileInputFormat.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde.
+ * FileInputFormat for Teradata binary files.
+ *
+ * In the Teradata Binary File, each record constructs as below:
+ * The first 2 bytes represents the length of the bytes next for this record.
+ * Then the null bitmap whose length is depended on the number of fields is 
followed.
+ * Then each field of the record is serialized into bytes - the serialization 
strategy is decided by the type of field.
+ * At last, there is one byte (0x0a) in the end of the record.
+ *
+ * This InputFormat currently doesn't support the split of the file.
+ * Teradata binary files are using little endian.
+ */
+public class TeradataBinaryFileInputFormat extends 
FileInputFormat<NullWritable, BytesWritable> {
+
+  @Override public RecordReader<NullWritable, BytesWritable> 
getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+    reporter.setStatus(split.toString());
+    return new TeradataBinaryRecordReader(job, (FileSplit) split);
+  }
+
+  /**
+   * the <code>TeradataBinaryFileInputFormat</code> is not splittable right 
now.
+   * Override the <code>isSplitable</code> function.
+   *
+   * @param fs the file system that the file is on
+   * @param filename the file name to check
+   * @return is this file splitable?
+   */
+  @Override protected boolean isSplitable(FileSystem fs, Path filename) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java 
b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java
new file mode 100644
index 0000000..bd7718e
--- /dev/null
+++ b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryFileOutputFormat.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Properties;
+
+import org.apache.commons.io.EndianUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Progressable;
+
+import static java.lang.String.format;
+
+/**
+ * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde.
+ * FileOutputFormat for Teradata binary files.
+ *
+ * In the Teradata Binary File, each record constructs as below:
+ * The first 2 bytes represents the length of the bytes next for this record 
(null bitmap and fields).
+ * Then the null bitmap whose length is depended on the number of fields is 
followe.
+ * Then each field of the record is serialized into bytes - the serialization 
strategy is decided by the type of field.
+ * At last, there is one byte (0x0a) in the end of the record.
+ *
+ * Teradata binary files are using little endian.
+ */
+public class TeradataBinaryFileOutputFormat<K extends WritableComparable, V 
extends Writable>
+    extends HiveIgnoreKeyTextOutputFormat<K, V> {
+  private static final Log LOG = 
LogFactory.getLog(TeradataBinaryFileOutputFormat.class);
+
+  static final byte RECORD_END_BYTE = (byte) 0x0a;
+
+  /**
+   * create the final out file, and output row by row. After one row is
+   * appended, a configured row separator is appended
+   *
+   * @param jc
+   *          the job configuration file
+   * @param outPath
+   *          the final output file to be created
+   * @param valueClass
+   *          the value class used for create
+   * @param isCompressed
+   *          whether the content is compressed or not
+   * @param tableProperties
+   *          the tableProperties of this file's corresponding table
+   * @param progress
+   *          progress used for status report
+   * @return the RecordWriter
+   */
+  @Override public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath, 
Class<? extends Writable> valueClass,
+      boolean isCompressed, final Properties tableProperties, Progressable 
progress) throws IOException {
+    FileSystem fs = outPath.getFileSystem(jc);
+    final OutputStream outStream = Utilities.createCompressedStream(jc, 
fs.create(outPath, progress), isCompressed);
+    return new RecordWriter() {
+      @Override public void write(Writable r) throws IOException {
+        BytesWritable bw = (BytesWritable) r;
+        int recordLength = bw.getLength();
+
+        //Based on the row length to decide if the length is int or short
+        String rowLength = tableProperties
+            .getProperty(TeradataBinaryRecordReader.TD_ROW_LENGTH, 
TeradataBinaryRecordReader.DEFAULT_TD_ROW_LENGTH)
+            .toLowerCase();
+        LOG.debug(format("The table property %s is: %s", 
TeradataBinaryRecordReader.TD_ROW_LENGTH, rowLength));
+
+        if 
(TeradataBinaryRecordReader.TD_ROW_LENGTH_TO_BYTE_NUM.containsKey(rowLength)) {
+          if 
(rowLength.equals(TeradataBinaryRecordReader.DEFAULT_TD_ROW_LENGTH)) {
+            EndianUtils.writeSwappedShort(outStream, (short) recordLength); // 
write the length using little endian
+          } else if 
(rowLength.equals(TeradataBinaryRecordReader.TD_ROW_LENGTH_1MB)) {
+            EndianUtils.writeSwappedInteger(outStream, recordLength); // write 
the length using little endian
+          }
+        } else {
+          throw new IllegalArgumentException(format("%s doesn't support the 
value %s, the supported values are %s",
+              TeradataBinaryRecordReader.TD_ROW_LENGTH, rowLength,
+              TeradataBinaryRecordReader.TD_ROW_LENGTH_TO_BYTE_NUM.keySet()));
+        }
+
+        outStream.write(bw.getBytes(), 0, bw.getLength()); // write the 
content (the content is in little endian)
+        outStream.write(RECORD_END_BYTE); //write the record ending
+      }
+
+      @Override public void close(boolean abort) throws IOException {
+        outStream.close();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java 
b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
new file mode 100644
index 0000000..337b5d2
--- /dev/null
+++ b/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.io.EndianUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import static java.lang.String.format;
+
+/**
+ * The TeradataBinaryRecordReader reads the record from Teradata binary files.
+ *
+ * In the Teradata Binary File, each record constructs as below:
+ * The first 2 bytes represents the length of the bytes next for this record.
+ * Then the null bitmap whose length is depended on the number of fields is 
followed.
+ * Then each field of the record is serialized into bytes - the serialization 
strategy is decided by the type of field.
+ * At last, there is one byte (0x0a) in the end of the record.
+ *
+ * This InputFormat currently doesn't support the split of the file.
+ * Teradata binary files are using little endian.
+ */
+public class TeradataBinaryRecordReader implements RecordReader<NullWritable, 
BytesWritable> {
+
+  private static final Log LOG = 
LogFactory.getLog(TeradataBinaryRecordReader.class);
+
+  private CompressionCodecFactory compressionCodecs = null;
+  private InputStream in;
+  private long start;
+  private long pos;
+  private long end;
+  private final Seekable filePosition;
+  private CompressionCodec codec;
+
+  static final String TD_ROW_LENGTH = "teradata.row.length";
+  static final Map<String, Integer> TD_ROW_LENGTH_TO_BYTE_NUM = 
ImmutableMap.of("64kb", 2, "1mb", 4);
+  static final String DEFAULT_TD_ROW_LENGTH = "64kb";
+  static final String TD_ROW_LENGTH_1MB = "1mb";
+
+  private byte[] recordLengthBytes;
+  private byte[] valueByteArray = new byte[65536]; // max byte array
+  private byte[] endOfRecord = new byte[1];
+
+  private int recordLength = 0;
+
+  public TeradataBinaryRecordReader(JobConf job, FileSplit fileSplit) throws 
IOException {
+    LOG.debug("initialize the TeradataBinaryRecordReader");
+
+    String rowLength = job.get(TD_ROW_LENGTH);
+    if (rowLength == null) {
+      LOG.debug("No table property in JobConf. Try to recover the table 
directly");
+      Map<String, PartitionDesc> partitionDescMap = 
Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo();
+      for (String alias : 
Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo().keySet()) {
+        LOG.debug(format("the current alias: %s", alias));
+        rowLength = 
partitionDescMap.get(alias).getTableDesc().getProperties().getProperty(TD_ROW_LENGTH);
+        if (rowLength != null) {
+          break;
+        }
+      }
+    }
+
+    if (rowLength == null) {
+      rowLength = DEFAULT_TD_ROW_LENGTH;
+    } else {
+      rowLength = rowLength.toLowerCase();
+    }
+
+    if (TD_ROW_LENGTH_TO_BYTE_NUM.containsKey(rowLength)) {
+      recordLengthBytes = new byte[TD_ROW_LENGTH_TO_BYTE_NUM.get(rowLength)];
+    } else {
+      throw new IllegalArgumentException(
+          format("%s doesn't support the value %s, the supported values are 
%s", TD_ROW_LENGTH, rowLength,
+              TD_ROW_LENGTH_TO_BYTE_NUM.keySet()));
+    }
+
+    start = fileSplit.getStart();
+    end = start + fileSplit.getLength();
+
+    LOG.debug(format("The start of the file split is: %s", start));
+    LOG.debug(format("The end of the file split is: %s", end));
+
+    final Path file = fileSplit.getPath();
+    compressionCodecs = new CompressionCodecFactory(job);
+    codec = compressionCodecs.getCodec(file);
+    FileSystem fs = file.getFileSystem(job);
+    FSDataInputStream fileIn = fs.open(fileSplit.getPath());
+
+    /* currently the TeradataBinaryRecord file doesn't support file split at 
all */
+    filePosition = fileIn;
+    if (isCompressedInput()) {
+      LOG.info(format("Input file is compressed. Using compression code %s", 
codec.getClass().getName()));
+      in = codec.createInputStream(fileIn);
+    } else {
+      LOG.info("The input file is not compressed");
+      in = fileIn;
+    }
+    pos = start;
+  }
+
+  /**
+   * Reads the next key/value pair from the input for processing.
+   *
+   * @param key the key to read data into
+   * @param value the value to read data into
+   * @return true iff a key/value was read, false if at EOF
+   */
+  @Override public synchronized boolean next(NullWritable key, BytesWritable 
value) throws IOException {
+
+    /* read the record length */
+    int lengthExpected = recordLengthBytes.length;
+    int hasConsumed = readExpectedBytes(recordLengthBytes, lengthExpected);
+    if (hasConsumed == 0) {
+      LOG.info("Reach the End of File. No more record");
+      return false;
+    } else if (hasConsumed < lengthExpected) {
+      LOG.error(
+          format("We expect %s bytes for the record length but read %d byte 
and reach the End of File.", lengthExpected,
+              hasConsumed));
+      LOG.error(format("The current position in the file : %s", 
getFilePosition()));
+      LOG.error(format("The current consumed bytes: %s", pos));
+      LOG.error(format("The bytes for the current record is: %s", 
Hex.encodeHexString(recordLengthBytes)));
+      throw new EOFException("When reading the record length, reach the 
unexpected end of file.");
+    }
+    /* get the record contend length to prepare to read the content */
+    recordLength = EndianUtils.readSwappedUnsignedShort(recordLengthBytes, 0);
+    pos += lengthExpected;
+
+    /* read the record content */
+    lengthExpected = recordLength;
+    hasConsumed = readExpectedBytes(valueByteArray, lengthExpected);
+    if (hasConsumed < lengthExpected) {
+      LOG.error(format("We expect %s bytes for the record content but read %d 
byte and reach the End of File.",
+          lengthExpected, hasConsumed));
+      LOG.error(format("The current position in the file : %s", 
getFilePosition()));
+      LOG.error(format("The current consumed bytes: %s", pos));
+      LOG.error(format("The bytes for the current record is: %s",
+          Hex.encodeHexString(recordLengthBytes) + 
Hex.encodeHexString(valueByteArray)));
+      throw new EOFException("When reading the contend of the record, reach 
the unexpected end of file.");
+    }
+    value.set(valueByteArray, 0, recordLength);
+    pos += lengthExpected;
+
+    /* read the record end */
+    lengthExpected = endOfRecord.length;
+    hasConsumed = readExpectedBytes(endOfRecord, lengthExpected);
+    if (hasConsumed < lengthExpected) {
+      LOG.error(format("We expect %s bytes for the record end symbol but read 
%d byte and reach the End of File.",
+          lengthExpected, hasConsumed));
+      LOG.error(format("The current position in the file : %s", 
getFilePosition()));
+      LOG.error(format("The current consumed bytes: %s", pos));
+      LOG.error(format("The bytes for the current record is: %s",
+          Hex.encodeHexString(recordLengthBytes) + 
Hex.encodeHexString(valueByteArray) + Hex
+              .encodeHexString(endOfRecord)));
+      throw new EOFException("When reading the end of record, reach the 
unexpected end of file.");
+    }
+
+    if (endOfRecord[0] != TeradataBinaryFileOutputFormat.RECORD_END_BYTE) {
+      throw new IOException(format("We expect 0x0a as the record end but get 
%s.", Hex.encodeHexString(endOfRecord)));
+    }
+    pos += lengthExpected;
+
+    return true;
+  }
+
+  /**
+   * Create an object of the appropriate type to be used as a key.
+   *
+   * @return a new key object.
+   */
+  @Override public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  /**
+   * Create an object of the appropriate type to be used as a value.
+   *
+   * @return a new value object.
+   */
+  @Override public BytesWritable createValue() {
+    return new BytesWritable();
+  }
+
+  /**
+   * Returns the current position in the input.
+   *
+   * @return the current position in the input.
+   * @throws IOException
+   */
+  @Override public long getPos() throws IOException {
+    return pos;
+  }
+
+  /**
+   *
+   * @throws IOException
+   */
+  @Override public void close() throws IOException {
+    if (in != null) {
+      in.close();
+    }
+  }
+
+  /**
+   * How much of the input has the {@link RecordReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   */
+  @Override public float getProgress() throws IOException {
+    if (start == end) {
+      return 0.0F;
+    } else {
+      return Math.min(1.0F, (float) (getFilePosition() - start) / (float) (end 
- start));
+    }
+  }
+
+  private boolean isCompressedInput() {
+    return codec != null;
+  }
+
+  private synchronized long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput() && filePosition != null) {
+      retVal = filePosition.getPos();
+    } else {
+      retVal = getPos();
+    }
+    return retVal;
+  }
+
+  private synchronized int readExpectedBytes(byte[] toWrite, int 
lengthExpected) throws IOException {
+    int curPos = 0;
+    do {
+      int numOfByteRead = in.read(toWrite, curPos, lengthExpected - curPos);
+      if (numOfByteRead < 0) {
+        return curPos;
+      } else {
+        curPos += numOfByteRead;
+      }
+    } while (curPos < lengthExpected);
+    return curPos;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java
 
b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java
new file mode 100644
index 0000000..230b3d5
--- /dev/null
+++ 
b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataInputStream.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.teradata;
+
+import org.apache.commons.io.input.SwappedDataInputStream;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import static java.lang.String.format;
+
+/**
+ * The TeradataBinaryDataInputStream is used to handle the Teradata binary 
format input for record.
+ * Since the TD binary format uses little-endian to handle the SHORT, INT, 
LONG, DOUBLE and etc.
+ * while the Hadoop uses big-endian,
+ * We extend SwappedDataInputStream to handle these types and extend to handle 
the Teradata
+ * specific types like VARCHAR, CHAR, TIMESTAMP, DATE...
+ */
+public class TeradataBinaryDataInputStream extends SwappedDataInputStream {
+
+  private static final int DATE_STRING_LENGTH = 8;
+  private final SimpleDateFormat dateFormat;
+
+  /**
+   * Instantiates a new Teradata binary data input stream.
+   *
+   * @param input the input
+   */
+  public TeradataBinaryDataInputStream(InputStream input) {
+    super(input);
+    dateFormat = new SimpleDateFormat("yyyyMMdd");
+  }
+
+  /**
+   * Read VARCHAR(N).
+   * The representation of Varchar in Teradata binary format is:
+   * the first two bytes represent the length N of this varchar field,
+   * the next N bytes represent the content of this varchar field.
+   * To pad the null varchar, the length will be 0 and the content will be 
none.
+   *
+   * @return the string
+   * @throws IOException the io exception
+   */
+  public String readVarchar() throws IOException {
+    int varcharLength = readUnsignedShort();
+    byte[] varcharContent = new byte[varcharLength];
+    int numOfBytesRead = in.read(varcharContent);
+    if (varcharContent.length != 0 && numOfBytesRead != varcharLength) {
+      throw new EOFException(
+          format("Fail to read the varchar. Expect %d bytes, get %d bytes", 
varcharLength, numOfBytesRead));
+    }
+    //force it to be UTF8 string
+    return new String(varcharContent, "UTF8");
+  }
+
+  /**
+   * Read TIMESTAMP(P).
+   * The representation of timestamp in Teradata binary format is:
+   * the byte number to read is based on the precision of timestamp,
+   * each byte represents one char and the timestamp is using string 
representation,
+   * eg: for TIMESTAMP(6), we need to read 26 bytes
+   * 31 39  31 31 2d 31 31 2d 31 31 20 31 39 3a 32 30 3a 32 31 2e 34 33 33 32 
30 30
+   * will represent 1911-11-11 19:20:21.433200.
+   * the null timestamp will use space to pad.
+   *
+   * @param byteNum the byte number that will be read from inputstream
+   * @return the timestamp
+   * @throws IOException the io exception
+   */
+  public Timestamp readTimestamp(Integer byteNum) throws IOException {
+    // yyyy-mm-dd hh:mm:ss
+    byte[] timestampContent = new byte[byteNum];
+    int numOfBytesRead = in.read(timestampContent);
+    if (timestampContent.length != 0 && numOfBytesRead != byteNum) {
+      throw new EOFException(
+          format("Fail to read the timestamp. Expect %d bytes, get %d bytes", 
byteNum, numOfBytesRead));
+    }
+    String timestampStr = new String(timestampContent, "UTF8");
+    if (timestampStr.trim().length() == 0) {
+      return null;
+    } else {
+      return Timestamp.valueOf(timestampStr);
+    }
+  }
+
+  /**
+   * Read DATE.
+   * The representation of date in Teradata binary format is:
+   * The Date D is a int with 4 bytes using little endian,
+   * The representation is (D+19000000).ToString -> YYYYMMDD,
+   * eg: Date 07 b2 01 00 -> 111111 in little endian -> 19111111 - > 
1911.11.11.
+   * the null date will use 0 to pad.
+   *
+   * @return the date
+   * @throws IOException the io exception
+   * @throws ParseException the parse exception
+   */
+  public Date readDate() throws IOException, ParseException {
+    int di = readInt();
+    if (di == 0) {
+      return null;
+    } else {
+      String dateString = String.valueOf(di + 19000000);
+      if (dateString.length() < DATE_STRING_LENGTH) {
+        dateString = StringUtils.leftPad(dateString, DATE_STRING_LENGTH, '0');
+      }
+      return new Date(dateFormat.parse(dateString).getTime());
+    }
+  }
+
+  /**
+   * Read CHAR(N).
+   * The representation of char in Teradata binary format is
+   * the byte number to read is based on the [charLength] * [bytePerChar] <- 
totalLength,
+   * bytePerChar is decided by the charset: LATAIN charset is 2 bytes per char 
and UNICODE charset is 3 bytes per char.
+   * the null char will use space to pad.
+   *
+   * @param totalLength the total length
+   * @return the string
+   * @throws IOException the io exception
+   */
+  public String readChar(int totalLength) throws IOException {
+    byte[] charContent = new byte[totalLength];
+    int numOfBytesRead = in.read(charContent);
+    if (charContent.length != 0 && numOfBytesRead != totalLength) {
+      throw new EOFException(
+          format("Fail to read the varchar. Expect %d bytes, get %d bytes", 
totalLength, numOfBytesRead));
+    }
+    return new String(charContent, "UTF8");
+  }
+
+  /**
+   * Read DECIMAL(P, S).
+   * The representation of decimal in Teradata binary format is
+   * the byte number to read is decided solely by the precision(P),
+   * HiveDecimal is constructed through the byte array and scale.
+   * the null DECIMAL will use 0x00 to pad.
+   *
+   * @param scale the scale
+   * @param byteNum the byte num
+   * @return the hive decimal
+   * @throws IOException the io exception
+   */
+  public HiveDecimal readDecimal(int scale, int byteNum) throws IOException {
+    byte[] decimalContent = new byte[byteNum];
+    int numOfBytesRead = in.read(decimalContent);
+    if (decimalContent.length != 0 && numOfBytesRead != byteNum) {
+      throw new EOFException(
+          format("Fail to read the decimal. Expect %d bytes, get %d bytes", 
byteNum, numOfBytesRead));
+    }
+    ArrayUtils.reverse(decimalContent);
+    return HiveDecimal.create(new BigInteger(decimalContent), scale);
+  }
+
+  /**
+   * Read VARBYTE(N).
+   * The representation of VARBYTE in Teradata binary format is:
+   * the first two bytes represent the length N of this varchar field
+   * the next N bytes represent the content of this varchar field.
+   * To pad the null varbyte, the length will be 0 and the content will be 
none.
+   *
+   * @return the byte [ ]
+   * @throws IOException the io exception
+   */
+  public byte[] readVarbyte() throws IOException {
+    int varbyteLength = readUnsignedShort();
+    byte[] varbyteContent = new byte[varbyteLength];
+    int numOfBytesRead = in.read(varbyteContent);
+    if (varbyteContent.length != 0 && numOfBytesRead != varbyteLength) {
+      throw new EOFException(
+          format("Fail to read the varbyte. Expect %d bytes, get %d bytes", 
varbyteLength, numOfBytesRead));
+    }
+    return varbyteContent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java
 
b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java
new file mode 100644
index 0000000..9bca182
--- /dev/null
+++ 
b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinaryDataOutputStream.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.teradata;
+
+import org.apache.commons.io.EndianUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+
+import static java.lang.String.format;
+
+/**
+ * The TeradataBinaryDataOutputStream is used to produce the output in 
compliance with the Teradata binary format,
+ * so the output can be directly used to load into Teradata DB using TPT 
fastload.
+ * Since the TD binary format uses little-endian to handle the SHORT, INT, 
LONG, DOUBLE and etc.
+ * while the Hadoop uses big-endian,
+ * We extend SwappedDataInputStream to return qualified bytes for these types 
and extend to handle the Teradata
+ * specific types like VARCHAR, CHAR, TIMESTAMP, DATE...
+ */
+public class TeradataBinaryDataOutputStream extends ByteArrayOutputStream {
+
+  private static final Log LOG = 
LogFactory.getLog(TeradataBinaryDataOutputStream.class);
+
+  private static final int TIMESTAMP_NO_NANOS_BYTE_NUM = 19;
+  private final SimpleDateFormat dateFormat;
+
+  public TeradataBinaryDataOutputStream() {
+    dateFormat = new SimpleDateFormat("yyyyMMdd");
+  }
+
+  /**
+   * Write VARCHAR(N).
+   * The representation of Varchar in Teradata binary format is:
+   * the first two bytes represent the length N of this varchar field,
+   * the next N bytes represent the content of this varchar field.
+   * To pad the null varchar, the length will be 0 and the content will be 
none.
+   *
+   * @param writable the writable
+   * @throws IOException the io exception
+   */
+  public void writeVarChar(HiveVarcharWritable writable) throws IOException {
+    if (writable == null) {
+      EndianUtils.writeSwappedShort(this, (short) 0);
+      return;
+    }
+    Text t = writable.getTextValue();
+    int varcharLength = t.getLength();
+    EndianUtils.writeSwappedShort(this, (short) varcharLength); // write the 
varchar length
+    write(t.getBytes(), 0, varcharLength); // write the varchar content
+  }
+
+  /**
+   * Write INT.
+   * using little-endian to write integer.
+   *
+   * @param i the
+   * @throws IOException the io exception
+   */
+  public void writeInt(int i) throws IOException {
+    EndianUtils.writeSwappedInteger(this, i);
+  }
+
+  /**
+   * Write TIMESTAMP(N).
+   * The representation of timestamp in Teradata binary format is:
+   * the byte number to read is based on the precision of timestamp,
+   * each byte represents one char and the timestamp is using string 
representation,
+   * eg: for 1911-11-11 19:20:21.433200 in TIMESTAMP(3), we will cut it to be 
1911-11-11 19:20:21.433 and write
+   * 31 39  31 31 2d 31 31 2d 31 31 20 31 39 3a 32 30 3a 32 31 2e 34 33 33.
+   * the null timestamp will use space to pad.
+   *
+   * @param timestamp the timestamp
+   * @param byteNum the byte number the timestamp will write
+   * @throws IOException the io exception
+   */
+  public void writeTimestamp(TimestampWritable timestamp, int byteNum) throws 
IOException {
+    if (timestamp == null) {
+      String pad = StringUtils.repeat(' ', byteNum);
+      write(pad.getBytes("UTF8"));
+    } else {
+      String sTimeStamp = timestamp.getTimestamp().toString();
+      if (sTimeStamp.length() >= byteNum) {
+        write(sTimeStamp.substring(0, byteNum).getBytes("UTF8"));
+      } else {
+        write(sTimeStamp.getBytes("UTF8"));
+        String pad;
+        if (sTimeStamp.length() == TIMESTAMP_NO_NANOS_BYTE_NUM) {
+          pad = "." + StringUtils.repeat('0', byteNum - sTimeStamp.length() - 
1);
+        } else {
+          pad = StringUtils.repeat('0', byteNum - sTimeStamp.length());
+        }
+        write(pad.getBytes("UTF8"));
+      }
+    }
+  }
+
+  /**
+   * Write DOUBLE.
+   * using little-endian to write double.
+   *
+   * @param d the d
+   * @throws IOException the io exception
+   */
+  public void writeDouble(double d) throws IOException {
+    EndianUtils.writeSwappedDouble(this, d);
+  }
+
+  /**
+   * Write DATE.
+   * The representation of date in Teradata binary format is:
+   * The Date D is a int with 4 bytes using little endian.
+   * The representation is (YYYYMMDD - 19000000).toInt -> D
+   * eg. 1911.11.11 -> 19111111 -> 111111 -> 07 b2 01 00 in little endian.
+   * the null date will use 0 to pad.
+   *
+   * @param date the date
+   * @throws IOException the io exception
+   */
+  public void writeDate(DateWritable date) throws IOException {
+    if (date == null) {
+      EndianUtils.writeSwappedInteger(this, 0);
+    } else {
+      EndianUtils.writeSwappedInteger(this, 
Integer.parseInt(dateFormat.format(date.get())) - 19000000);
+    }
+  }
+
+  /**
+   * Write LONG.
+   * using little-endian to write double.
+   *
+   * @param l the l
+   * @throws IOException the io exception
+   */
+  public void writeLong(long l) throws IOException {
+    EndianUtils.writeSwappedLong(this, l);
+  }
+
+  /**
+   * Write CHAR(N).
+   * The representation of char in Teradata binary format is:
+   * the byte number to read is based on the [charLength] * [bytePerChar] <- 
totalLength,
+   * bytePerChar is decided by the charset: LATAIN charset is 2 bytes per char 
and UNICODE charset is 3 bytes per char.
+   * the null char will use space to pad.
+   *
+   * @param writable the writable
+   * @param length the byte n
+   * @throws IOException the io exception
+   */
+  public void writeChar(HiveCharWritable writable, int length) throws 
IOException {
+    if (writable == null) {
+      String pad = StringUtils.repeat(' ', length);
+      write(pad.getBytes("UTF8"));
+      return;
+    }
+    Text t = writable.getStrippedValue();
+    int contentLength = t.getLength();
+    write(t.getBytes(), 0, contentLength);
+    if (length - contentLength < 0) {
+      throw new IOException(format("The byte num %s of HiveCharWritable is 
more than the byte num %s we can hold. "
+          + "The content of HiveCharWritable is %s", contentLength, length, 
writable.getPaddedValue()));
+    }
+    if (length > contentLength) {
+      String pad = StringUtils.repeat(' ', length - contentLength);
+      write(pad.getBytes("UTF8"));
+    }
+  }
+
+  /**
+   * Write DECIMAL(P, S).
+   * The representation of decimal in Teradata binary format is:
+   * the byte number to read is decided solely by the precision(P),
+   * HiveDecimal is constructed through the byte array and scale.
+   * the rest of byte will use 0x00 to pad (positive) and use 0xFF to pad 
(negative).
+   * the null DECIMAL will use 0x00 to pad.
+   *
+   * @param writable the writable
+   * @param byteNum the byte num
+   * @throws IOException the io exception
+   */
+  public void writeDecimal(HiveDecimalWritable writable, int byteNum, int 
scale) throws IOException {
+    if (writable == null) {
+      byte[] pad = new byte[byteNum];
+      write(pad);
+      return;
+    }
+    // since the HiveDecimal will auto adjust the scale to save resource
+    // we need to adjust it back otherwise the output bytes will be wrong
+    int hiveScale = writable.getHiveDecimal().scale();
+    BigInteger bigInteger = writable.getHiveDecimal().unscaledValue();
+    if (hiveScale < scale) {
+      BigInteger multiplicand = new BigInteger("1" + StringUtils.repeat('0', 
scale - hiveScale));
+      bigInteger = bigInteger.multiply(multiplicand);
+    }
+    byte[] content = bigInteger.toByteArray();
+    int signBit = content[0] >> 7 & 1;
+    ArrayUtils.reverse(content);
+    write(content);
+    if (byteNum > content.length) {
+      byte[] pad;
+      if (signBit == 0) {
+        pad = new byte[byteNum - content.length];
+      } else {
+        pad = new byte[byteNum - content.length];
+        Arrays.fill(pad, (byte) 255);
+      }
+      write(pad);
+    }
+  }
+
+  /**
+   * Write SHORT.
+   * using little-endian to write short.
+   *
+   * @param s the s
+   * @throws IOException the io exception
+   */
+  public void writeShort(short s) throws IOException {
+    EndianUtils.writeSwappedShort(this, s);
+  }
+
+  /**
+   * Write VARBYTE(N).
+   * The representation of VARBYTE in Teradata binary format is:
+   * the first two bytes represent the length N of this varchar field,
+   * the next N bytes represent the content of this varchar field.
+   * To pad the null varbyte, the length will be 0 and the content will be 
none.
+   *
+   * @param writable the writable
+   * @throws IOException the io exception
+   */
+  public void writeVarByte(BytesWritable writable) throws IOException {
+    if (writable == null) {
+      EndianUtils.writeSwappedShort(this, (short) 0);
+      return;
+    }
+    int varbyteLength = writable.getLength();
+    EndianUtils.writeSwappedShort(this, (short) varbyteLength); // write the 
varbyte length
+    write(writable.getBytes(), 0, varbyteLength); // write the varchar content
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java 
b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java
new file mode 100644
index 0000000..2254e88
--- /dev/null
+++ b/src/java/org/apache/hadoop/hive/serde2/teradata/TeradataBinarySerde.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.teradata;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.lang.String.format;
+
+/**
+ * https://cwiki.apache.org/confluence/display/Hive/TeradataBinarySerde.
+ * TeradataBinarySerde handles the serialization and deserialization of 
Teradata Binary Record
+ * passed from TeradataBinaryRecordReader.
+ *
+ * The Teradata Binary Record uses little-endian to handle the SHORT, INT, 
LONG, DOUBLE...
+ * We extend SwappedDataInputStream to handle these types and extend to handle 
the Teradata
+ * specific types like VARCHAR, CHAR, TIMESTAMP, DATE...
+ *
+ * Currently we support 11 Teradata data types: VARCHAR ,INTEGER, TIMESTAMP, 
FLOAT, DATE,
+ * BYTEINT, BIGINT, CHARACTER, DECIMAL, SMALLINT, VARBYTE.
+ * The mapping between Teradata data type and Hive data type is
+ * Teradata Data Type: Hive Data Type
+ * VARCHAR: VARCHAR,
+ * INTEGER: INT,
+ * TIMESTAMP: TIMESTAMP,
+ * FLOAT: DOUBLE,
+ * DATE: DATE,
+ * BYTEINT: TINYINT ,
+ * BIGINT: BIGINT,
+ * CHAR: CHAR,
+ * DECIMAL: DECIMAL,
+ * SMALLINT: SMALLINT,
+ * VARBYTE: BINARY.
+ *
+ * TeradataBinarySerde currently doesn't support complex types like MAP, ARRAY 
and STRUCT.
+ */
+@SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS,
+    serdeConstants.LIST_COLUMN_TYPES }) public class TeradataBinarySerde 
extends AbstractSerDe {
+  private static final Log LOG = LogFactory.getLog(TeradataBinarySerde.class);
+
+  public static final String TD_SCHEMA_LITERAL = "teradata.schema.literal";
+
+  private StructObjectInspector rowOI;
+  private ArrayList<Object> row;
+  private byte[] inForNull;
+
+  private int numCols;
+  private List<String> columnNames;
+  private List<TypeInfo> columnTypes;
+
+  private TeradataBinaryDataOutputStream out;
+  private BytesWritable serializeBytesWritable;
+  private byte[] outForNull;
+
+  public static final String TD_TIMESTAMP_PRECISION = 
"teradata.timestamp.precision";
+  private int timestampPrecision;
+  private static final int DEFAULT_TIMESTAMP_BYTE_NUM = 19;
+  private static final String DEFAULT_TIMESTAMP_PRECISION = "6";
+
+  public static final String TD_CHAR_SET = "teradata.char.charset";
+  private String charCharset;
+  private static final String DEFAULT_CHAR_CHARSET = "UNICODE";
+  private static final Map<String, Integer> CHARSET_TO_BYTE_NUM = 
ImmutableMap.of("LATIN", 2, "UNICODE", 3);
+
+  /**
+   * Initialize the HiveSerializer.
+   *
+   * @param conf
+   *          System properties. Can be null in compile time
+   * @param tbl
+   *          table properties
+   * @throws SerDeException
+   */
+  @Override public void initialize(@Nullable Configuration conf, Properties 
tbl) throws SerDeException {
+    columnNames = 
Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS).split(","));
+
+    String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+    LOG.debug(serdeConstants.LIST_COLUMN_TYPES + ": " + columnTypeProperty);
+    if (columnTypeProperty.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    }
+
+    assert columnNames.size() == columnTypes.size();
+    numCols = columnNames.size();
+
+    // get the configured teradata timestamp precision
+    // you can configure to generate timestamp of different precision in the 
binary file generated by TPT/BTEQ
+    timestampPrecision = 
Integer.parseInt(tbl.getProperty(TD_TIMESTAMP_PRECISION, 
DEFAULT_TIMESTAMP_PRECISION));
+
+    // get the configured teradata char charset
+    // in TD, latin charset will have 2 bytes per char and unicode will have 3 
bytes per char
+    charCharset = tbl.getProperty(TD_CHAR_SET, DEFAULT_CHAR_CHARSET);
+    if (!CHARSET_TO_BYTE_NUM.containsKey(charCharset)) {
+      throw new SerDeException(
+          format("%s isn't supported in Teradata Char Charset %s", 
charCharset, CHARSET_TO_BYTE_NUM.keySet()));
+    }
+
+    // All columns have to be primitive.
+    // Constructing the row ObjectInspector:
+    List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(numCols);
+    for (int i = 0; i < numCols; i++) {
+      if (columnTypes.get(i).getCategory() != 
ObjectInspector.Category.PRIMITIVE) {
+        throw new SerDeException(
+            getClass().getName() + " only accepts primitive columns, but 
column[" + i + "] named " + columnNames.get(i)
+                + " has category " + columnTypes.get(i).getCategory());
+      }
+      
columnOIs.add(TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(columnTypes.get(i)));
+    }
+
+    rowOI = 
ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
+
+    // Constructing the row object and will be reused for all rows
+    row = new ArrayList<Object>(numCols);
+    for (int i = 0; i < numCols; i++) {
+      row.add(null);
+    }
+
+    // Initialize vars related to Null Array which represents the null bitmap
+    int byteNumForNullArray = (numCols / 8) + ((numCols % 8 == 0) ? 0 : 1);
+    LOG.debug(format("The Null Bytes for each record will have %s bytes", 
byteNumForNullArray));
+    inForNull = new byte[byteNumForNullArray];
+
+    out = new TeradataBinaryDataOutputStream();
+    serializeBytesWritable = new BytesWritable();
+    outForNull = new byte[byteNumForNullArray];
+  }
+
+  /**
+   * Returns the Writable class that would be returned by the serialize method.
+   * This is used to initialize SequenceFile header.
+   */
+  @Override public Class<? extends Writable> getSerializedClass() {
+    return ByteWritable.class;
+  }
+
+  /**
+   * Serialize an object by navigating inside the Object with the
+   * ObjectInspector. In most cases, the return value of this function will be
+   * constant since the function will reuse the Writable object. If the client
+   * wants to keep a copy of the Writable, the client needs to clone the
+   * returned value.
+
+   * @param obj
+   * @param objInspector
+   */
+  @Override public Writable serialize(Object obj, ObjectInspector 
objInspector) throws SerDeException {
+    try {
+      out.reset();
+      final StructObjectInspector outputRowOI = (StructObjectInspector) 
objInspector;
+      final List<? extends StructField> fieldRefs = 
outputRowOI.getAllStructFieldRefs();
+
+      if (fieldRefs.size() != numCols) {
+        throw new SerDeException(
+            "Cannot serialize the object because there are " + 
fieldRefs.size() + " fieldRefs but the table defined "
+                + numCols + " columns.");
+      }
+
+      // Fully refresh the Null Array to write into the out
+      for (int i = 0; i < numCols; i++) {
+        Object objectForField = outputRowOI.getStructFieldData(obj, 
fieldRefs.get(i));
+        if (objectForField == null) {
+          outForNull[i / 8] = (byte) (outForNull[i / 8] | (0x01 << (7 - (i % 
8))));
+        } else {
+          outForNull[i / 8] = (byte) (outForNull[i / 8] & ~(0x01 << (7 - (i % 
8))));
+        }
+      }
+      out.write(outForNull);
+
+      // serialize each field using FieldObjectInspector
+      for (int i = 0; i < numCols; i++) {
+        Object objectForField = outputRowOI.getStructFieldData(obj, 
fieldRefs.get(i));
+        serializeField(objectForField, 
fieldRefs.get(i).getFieldObjectInspector(), columnTypes.get(i));
+      }
+
+      serializeBytesWritable.set(out.toByteArray(), 0, out.size());
+      return serializeBytesWritable;
+    } catch (IOException e) {
+      throw new SerDeException(e);
+    }
+  }
+
+  private void serializeField(Object objectForField, ObjectInspector oi, 
TypeInfo ti)
+      throws IOException, SerDeException {
+    switch (oi.getCategory()) {
+    case PRIMITIVE:
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      switch (poi.getPrimitiveCategory()) {
+      // Teradata Type: BYTEINT
+      case BYTE:
+        ByteObjectInspector boi = (ByteObjectInspector) poi;
+        byte b = 0;
+        if (objectForField != null) {
+          b = boi.get(objectForField);
+        }
+        out.write(b);
+        return;
+      // Teradata Type: SMALLINT
+      case SHORT:
+        ShortObjectInspector spoi = (ShortObjectInspector) poi;
+        short s = 0;
+        if (objectForField != null) {
+          s = spoi.get(objectForField);
+        }
+        out.writeShort(s);
+        return;
+      // Teradata Type: INT
+      case INT:
+        IntObjectInspector ioi = (IntObjectInspector) poi;
+        int i = 0;
+        if (objectForField != null) {
+          i = ioi.get(objectForField);
+        }
+        out.writeInt(i);
+        return;
+      // Teradata Type: BIGINT
+      case LONG:
+        LongObjectInspector loi = (LongObjectInspector) poi;
+        long l = 0;
+        if (objectForField != null) {
+          l = loi.get(objectForField);
+        }
+        out.writeLong(l);
+        return;
+      // Teradata Type: FLOAT
+      case DOUBLE:
+        DoubleObjectInspector doi = (DoubleObjectInspector) poi;
+        double d = 0;
+        if (objectForField != null) {
+          d = doi.get(objectForField);
+        }
+        out.writeDouble(d);
+        return;
+      // Teradata Type: VARCHAR
+      case VARCHAR:
+        HiveVarcharObjectInspector hvoi = (HiveVarcharObjectInspector) poi;
+        HiveVarcharWritable hv = 
hvoi.getPrimitiveWritableObject(objectForField);
+        // assert the length of varchar record fits into the table definition
+        if (hv != null) {
+          assert ((VarcharTypeInfo) ti).getLength() >= 
hv.getHiveVarchar().getCharacterLength();
+        }
+        out.writeVarChar(hv);
+        return;
+      // Teradata Type: TIMESTAMP
+      case TIMESTAMP:
+        TimestampObjectInspector tsoi = (TimestampObjectInspector) poi;
+        TimestampWritable ts = tsoi.getPrimitiveWritableObject(objectForField);
+        out.writeTimestamp(ts, getTimeStampByteNum(timestampPrecision));
+        return;
+      // Teradata Type: DATE
+      case DATE:
+        DateObjectInspector dtoi = (DateObjectInspector) poi;
+        DateWritable dw = dtoi.getPrimitiveWritableObject(objectForField);
+        out.writeDate(dw);
+        return;
+      // Teradata Type: CHAR
+      case CHAR:
+        HiveCharObjectInspector coi = (HiveCharObjectInspector) poi;
+        HiveCharWritable hc = coi.getPrimitiveWritableObject(objectForField);
+        // assert the length of char record fits into the table definition
+        if (hc != null) {
+          assert ((CharTypeInfo) ti).getLength() >= 
hc.getHiveChar().getCharacterLength();
+        }
+        out.writeChar(hc, getCharByteNum(charCharset) * ((CharTypeInfo) 
ti).getLength());
+        return;
+      // Teradata Type: DECIMAL
+      case DECIMAL:
+        DecimalTypeInfo dtype = (DecimalTypeInfo) ti;
+        int precision = dtype.precision();
+        int scale = dtype.scale();
+        HiveDecimalObjectInspector hdoi = (HiveDecimalObjectInspector) poi;
+        HiveDecimalWritable hd = 
hdoi.getPrimitiveWritableObject(objectForField);
+        // assert the precision of decimal record fits into the table 
definition
+        if (hd != null) {
+          assert (dtype.getPrecision() >= hd.precision());
+        }
+        out.writeDecimal(hd, getDecimalByteNum(precision), scale);
+        return;
+      // Teradata Type: VARBYTE
+      case BINARY:
+        BinaryObjectInspector bnoi = (BinaryObjectInspector) poi;
+        BytesWritable byw = bnoi.getPrimitiveWritableObject(objectForField);
+        out.writeVarByte(byw);
+        return;
+      default:
+        throw new SerDeException("Unrecognized type: " + 
poi.getPrimitiveCategory());
+      }
+      // Currently, serialization of complex types is not supported
+    case LIST:
+    case MAP:
+    case STRUCT:
+    default:
+      throw new SerDeException("Unrecognized type: " + oi.getCategory());
+    }
+  }
+
+  @Override public SerDeStats getSerDeStats() {
+    // no support for statistics
+    return null;
+  }
+
+  /**
+   * Deserialize an object out of a Writable blob. In most cases, the return
+   * value of this function will be constant since the function will reuse the
+   * returned object. If the client wants to keep a copy of the object, the
+   * client needs to clone the returned value by calling
+   * ObjectInspectorUtils.getStandardObject().
+   *
+   * @param blob
+   *          The Writable object containing a serialized object
+   * @return A Java object representing the contents in the blob.
+   */
+  @Override public Object deserialize(Writable blob) throws SerDeException {
+    try {
+      BytesWritable data = (BytesWritable) blob;
+
+      // initialize the data to be the input stream
+      TeradataBinaryDataInputStream in =
+          new TeradataBinaryDataInputStream(new 
ByteArrayInputStream(data.getBytes(), 0, data.getLength()));
+
+      int numOfByteRead = in.read(inForNull);
+
+      if (inForNull.length != 0 && numOfByteRead != inForNull.length) {
+        throw new EOFException("not enough bytes for one object");
+      }
+
+      boolean isNull;
+      for (int i = 0; i < numCols; i++) {
+        // get if the ith field is null or not
+        isNull = ((inForNull[i / 8] & (128 >> (i % 8))) != 0);
+        row.set(i, deserializeField(in, columnTypes.get(i), row.get(i), 
isNull));
+      }
+
+      //After deserializing all the fields, the input should be over in which 
case in.read will return -1
+      if (in.read() != -1) {
+        throw new EOFException("The inputstream has more after we deserialize 
all the fields - this is unexpected");
+      }
+    } catch (EOFException e) {
+      LOG.warn("Catch thrown exception", e);
+      LOG.warn("This record has been polluted. We have reset all the row 
fields to be null");
+      for (int i = 0; i < numCols; i++) {
+        row.set(i, null);
+      }
+    } catch (IOException e) {
+      throw new SerDeException(e);
+    } catch (ParseException e) {
+      throw new SerDeException(e);
+    }
+    return row;
+  }
+
+  private Object deserializeField(TeradataBinaryDataInputStream in, TypeInfo 
type, Object reuse, boolean isNull)
+      throws IOException, ParseException, SerDeException {
+    // isNull:
+    // In the Teradata Binary file, even the field is null (isNull=true),
+    // thd data still has some default values to pad the record.
+    // In this case, you cannot avoid reading the bytes even it is not used.
+    switch (type.getCategory()) {
+    case PRIMITIVE:
+      PrimitiveTypeInfo ptype = (PrimitiveTypeInfo) type;
+      switch (ptype.getPrimitiveCategory()) {
+      case VARCHAR: // Teradata Type: VARCHAR
+        String st = in.readVarchar();
+        if (isNull) {
+          return null;
+        } else {
+          HiveVarcharWritable r = reuse == null ? new HiveVarcharWritable() : 
(HiveVarcharWritable) reuse;
+          r.set(st, ((VarcharTypeInfo) type).getLength());
+          return r;
+        }
+      case INT: // Teradata Type: INT
+        int i = in.readInt();
+        if (isNull) {
+          return null;
+        } else {
+          IntWritable r = reuse == null ? new IntWritable() : (IntWritable) 
reuse;
+          r.set(i);
+          return r;
+        }
+      case TIMESTAMP: // Teradata Type: TIMESTAMP
+        Timestamp ts = 
in.readTimestamp(getTimeStampByteNum(timestampPrecision));
+        if (isNull) {
+          return null;
+        } else {
+          TimestampWritable r = reuse == null ? new TimestampWritable() : 
(TimestampWritable) reuse;
+          r.set(ts);
+          return r;
+        }
+      case DOUBLE: // Teradata Type: FLOAT
+        double d = in.readDouble();
+        if (isNull) {
+          return null;
+        } else {
+          DoubleWritable r = reuse == null ? new DoubleWritable() : 
(DoubleWritable) reuse;
+          r.set(d);
+          return r;
+        }
+      case DATE: // Teradata Type: DATE
+        Date dt = in.readDate();
+        if (isNull) {
+          return null;
+        } else {
+          DateWritable r = reuse == null ? new DateWritable() : (DateWritable) 
reuse;
+          r.set(dt);
+          return r;
+        }
+      case BYTE: // Teradata Type: BYTEINT
+        byte bt = in.readByte();
+        if (isNull) {
+          return null;
+        } else {
+          ByteWritable r = reuse == null ? new ByteWritable() : (ByteWritable) 
reuse;
+          r.set(bt);
+          return r;
+        }
+      case LONG: // Teradata Type: BIGINT
+        long l = in.readLong();
+        if (isNull) {
+          return null;
+        } else {
+          LongWritable r = reuse == null ? new LongWritable() : (LongWritable) 
reuse;
+          r.set(l);
+          return r;
+        }
+      case CHAR: // Teradata Type: CHAR
+        CharTypeInfo ctype = (CharTypeInfo) type;
+        int length = ctype.getLength();
+        String c = in.readChar(length * getCharByteNum(charCharset));
+        if (isNull) {
+          return null;
+        } else {
+          HiveCharWritable r = reuse == null ? new HiveCharWritable() : 
(HiveCharWritable) reuse;
+          r.set(c, length);
+          return r;
+        }
+      case DECIMAL: // Teradata Type: DECIMAL
+        DecimalTypeInfo dtype = (DecimalTypeInfo) type;
+        int precision = dtype.precision();
+        int scale = dtype.scale();
+        HiveDecimal hd = in.readDecimal(scale, getDecimalByteNum(precision));
+        if (isNull) {
+          return null;
+        } else {
+          HiveDecimalWritable r = (reuse == null ? new HiveDecimalWritable() : 
(HiveDecimalWritable) reuse);
+          r.set(hd);
+          return r;
+        }
+      case SHORT: // Teradata Type: SMALLINT
+        short s = in.readShort();
+        if (isNull) {
+          return null;
+        } else {
+          ShortWritable r = reuse == null ? new ShortWritable() : 
(ShortWritable) reuse;
+          r.set(s);
+          return r;
+        }
+      case BINARY: // Teradata Type: VARBYTE
+        byte[] content = in.readVarbyte();
+        if (isNull) {
+          return null;
+        } else {
+          BytesWritable r = new BytesWritable();
+          r.set(content, 0, content.length);
+          return r;
+        }
+      default:
+        throw new SerDeException("Unrecognized type: " + 
ptype.getPrimitiveCategory());
+      }
+      // Currently, deserialization of complex types is not supported
+    case LIST:
+    case MAP:
+    case STRUCT:
+    default:
+      throw new SerDeException("Unsupported category: " + type.getCategory());
+    }
+  }
+
+  /**
+   * Get the object inspector that can be used to navigate through the internal
+   * structure of the Object returned from deserialize(...).
+   */
+  @Override public ObjectInspector getObjectInspector() throws SerDeException {
+    return rowOI;
+  }
+
+  private int getTimeStampByteNum(int precision) {
+    if (precision == 0) {
+      return DEFAULT_TIMESTAMP_BYTE_NUM;
+    } else {
+      return precision + 1 + DEFAULT_TIMESTAMP_BYTE_NUM;
+    }
+  }
+
+  private int getCharByteNum(String charset) throws SerDeException {
+    if (!CHARSET_TO_BYTE_NUM.containsKey(charCharset)) {
+      throw new SerDeException(
+          format("%s isn't supported in Teradata Char Charset %s", 
charCharset, CHARSET_TO_BYTE_NUM.keySet()));
+    } else {
+      return CHARSET_TO_BYTE_NUM.get(charset);
+    }
+  }
+
+  private int getDecimalByteNum(int precision) throws SerDeException {
+    if (precision <= 0) {
+      throw new SerDeException(format("the precision of Decimal should be 
bigger than 0. %d is illegal", precision));
+    }
+    if (precision <= 2) {
+      return 1;
+    }
+    if (precision <= 4) {
+      return 2;
+    }
+    if (precision <= 9) {
+      return 4;
+    }
+    if (precision <= 18) {
+      return 8;
+    }
+    if (precision <= 38) {
+      return 16;
+    }
+    throw new IllegalArgumentException(
+        format("the precision of Decimal should be smaller than 39. %d is 
illegal", precision));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java
 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java
new file mode 100644
index 0000000..01ac43b
--- /dev/null
+++ 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDate.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.teradata;
+
+import com.google.common.io.BaseEncoding;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.junit.Assert;
+
+import java.sql.Date;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Test the data type DATE for Teradata binary format.
+ */
+public class TestTeradataBinarySerdeForDate extends TestCase {
+
+  private final TeradataBinarySerde serde = new TeradataBinarySerde();
+  private final Properties props = new Properties();
+
+  protected void setUp() throws Exception {
+    props.setProperty(serdeConstants.LIST_COLUMNS, "TD_DATE");
+    props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "date");
+    serde.initialize(null, props);
+  }
+
+  public void testTimestampBefore1900() throws Exception {
+
+    //0060-01-01
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("00653de7fe"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Date ts = ((DateWritable) row.get(0)).get();
+    Assert.assertTrue(ts.toString().equals( "0060-01-01"));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testTimestampAfter1900() throws Exception {
+
+    //9999-01-01
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("0095cfd304"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Date ts = ((DateWritable) row.get(0)).get();
+    Assert.assertTrue(ts.toString().equals( "9999-01-01"));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java
 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java
new file mode 100644
index 0000000..6abdd3f
--- /dev/null
+++ 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForDecimal.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.teradata;
+
+import com.google.common.io.BaseEncoding;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Test the data type DECIMAL for Teradata binary format.
+ */
+public class TestTeradataBinarySerdeForDecimal extends TestCase {
+
+  private final TeradataBinarySerde serde = new TeradataBinarySerde();
+  private final Properties props = new Properties();
+
+  protected void setUp() throws Exception {
+    props.setProperty(serdeConstants.LIST_COLUMNS, "TD_DECIMAL");
+    props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "decimal(9,5)");
+
+    serde.initialize(null, props);
+  }
+
+  public void testPositiveFraction() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("0064000000"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertTrue("0.001".equals(((HiveDecimalWritable) 
row.get(0)).getHiveDecimal().toString()));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testNegativeFraction() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("009cffffff"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertTrue("-0.001".equals(((HiveDecimalWritable) 
row.get(0)).getHiveDecimal().toString()));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testPositiveNumber1() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("00a0860100"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertTrue("1".equals(((HiveDecimalWritable) 
row.get(0)).getHiveDecimal().toString()));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testNegativeNumber1() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("006079feff"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertTrue("-1".equals(((HiveDecimalWritable) 
row.get(0)).getHiveDecimal().toString()));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testPositiveNumber2() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("0080969800"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertTrue("100".equals(((HiveDecimalWritable) 
row.get(0)).getHiveDecimal().toString()));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testNegativeNumber2() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("000065c4e0"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertTrue("-5240".equals(((HiveDecimalWritable) 
row.get(0)).getHiveDecimal().toString()));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java
 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java
new file mode 100644
index 0000000..61cc2a5
--- /dev/null
+++ 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeForTimeStamp.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.teradata;
+
+import com.google.common.io.BaseEncoding;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Test the data type TIMESTAMP for Teradata binary format.
+ */
+public class TestTeradataBinarySerdeForTimeStamp extends TestCase {
+
+  private final TeradataBinarySerde serde = new TeradataBinarySerde();
+  private final Properties props = new Properties();
+
+  protected void setUp() throws Exception {
+    props.setProperty(serdeConstants.LIST_COLUMNS, "TD_TIMESTAMP");
+    props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "timestamp");
+  }
+
+  public void testTimestampPrecision6() throws Exception {
+    props.setProperty(TeradataBinarySerde.TD_TIMESTAMP_PRECISION, "6");
+    serde.initialize(null, props);
+
+    //2012-10-01 12:00:00.110000
+    BytesWritable in = new BytesWritable(
+        
BaseEncoding.base16().lowerCase().decode("00323031322d31302d30312031323a30303a30302e313130303030"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Timestamp ts = ((TimestampWritable) row.get(0)).getTimestamp();
+    Assert.assertEquals("2012-10-01 12:00:00.11", ts.toString());
+
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testTimestampPrecision0() throws Exception {
+    props.setProperty(TeradataBinarySerde.TD_TIMESTAMP_PRECISION, "0");
+    serde.initialize(null, props);
+
+    //2012-10-01 12:00:00
+    BytesWritable in =
+        new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("00323031322d31302d30312031323a30303a3030"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Timestamp ts = ((TimestampWritable) row.get(0)).getTimestamp();
+    Assert.assertEquals("2012-10-01 12:00:00.0", ts.toString());
+
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testTimestampPrecision3() throws Exception {
+    props.setProperty(TeradataBinarySerde.TD_TIMESTAMP_PRECISION, "3");
+    serde.initialize(null, props);
+
+    //2012-10-01 12:00:00.345
+    BytesWritable in =
+        new 
BytesWritable(BaseEncoding.base16().lowerCase().decode("00323031322d31302d30312031323a30303a30302e333435"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Timestamp ts = ((TimestampWritable) row.get(0)).getTimestamp();
+    Assert.assertEquals("2012-10-01 12:00:00.345", ts.toString());
+
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8b16ad0f/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java
----------------------------------------------------------------------
diff --git 
a/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java
 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java
new file mode 100644
index 0000000..aad77e9
--- /dev/null
+++ 
b/src/test/org/apache/hadoop/hive/serde2/teradata/TestTeradataBinarySerdeGeneral.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.teradata;
+
+import com.google.common.io.BaseEncoding;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.io.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Test all the data types supported for Teradata Binary Format.
+ */
+public class TestTeradataBinarySerdeGeneral extends TestCase {
+
+  private final TeradataBinarySerde serde = new TeradataBinarySerde();
+  private final Properties props = new Properties();
+
+  protected void setUp() throws Exception {
+    props.setProperty(serdeConstants.LIST_COLUMNS,
+        "TD_CHAR, TD_VARCHAR, TD_BIGINT, TD_INT, TD_SMALLINT, TD_BYTEINT, "
+            + "TD_FLOAT,TD_DECIMAL,TD_DATE, TD_TIMESTAMP, TD_VARBYTE");
+    props.setProperty(serdeConstants.LIST_COLUMN_TYPES,
+        
"char(3),varchar(100),bigint,int,smallint,tinyint,double,decimal(31,30),date,timestamp,binary");
+
+    serde.initialize(null, props);
+  }
+
+  public void testDeserializeAndSerialize() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode(
+        
"00004e6f762020202020201b006120646179203d2031312f31312f31312020202020202020203435ec10000000000000c5feffff"
+            + 
"7707010000000000002a40ef2b3dab0d14e6531c8908a72700000007b20100313931312d31312d31312031393a32303a32312e34"
+            + 
"33333230301b00746573743a20202020202020343333322020202020202020333135"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertEquals("Nov", ((HiveCharWritable) row.get(0)).toString());
+    Assert.assertEquals("a day = 11/11/11         45", ((HiveVarcharWritable) 
row.get(1)).toString());
+    Assert.assertEquals(4332L, ((LongWritable) row.get(2)).get());
+    Assert.assertEquals(-315, ((IntWritable) row.get(3)).get());
+    Assert.assertEquals((short) 1911, ((ShortWritable) row.get(4)).get());
+    Assert.assertEquals((byte) 1, ((ByteWritable) row.get(5)).get());
+    Assert.assertEquals((double) 13, ((DoubleWritable) row.get(6)).get(), 0);
+    Assert.assertEquals(30, ((HiveDecimalWritable) row.get(7)).getScale());
+    Assert.assertEquals((double) 3.141592653589793238462643383279,
+        ((HiveDecimalWritable) row.get(7)).getHiveDecimal().doubleValue(), 0);
+    Assert.assertEquals("1911-11-11", ((DateWritable) row.get(8)).toString());
+    Assert.assertEquals("1911-11-11 19:20:21.4332", ((TimestampWritable) 
row.get(9)).toString());
+    Assert.assertEquals(27, ((BytesWritable) row.get(10)).getLength());
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testDeserializeAndSerializeWithNull() throws Exception {
+    //null bitmap: 0160 -> 00000001 01100000, 7th, 9th, 10th is null
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode(
+        
"01604d61722020202020201b006120646179203d2031332f30332f303820202020202020202020397ca10000000000004300000"
+            + 
"0dd0700000000000048834000000000000000000000000000000000443f110020202020202020202020202020202020202020202"
+            + "020202020200000"));
+    List<Object> row = (List<Object>) serde.deserialize(in);
+
+    Assert.assertEquals("Mar", ((HiveCharWritable) row.get(0)).toString());
+    Assert.assertEquals(null, row.get(7));
+    Assert.assertEquals(null, row.get(9));
+    Assert.assertEquals(null, row.get(10));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testDeserializeAndSerializeAllNull() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode(
+        
"ffe0202020202020202020000000000000000000000000000000000000000000000000000000000000000000000000000000000"
+            + 
"00000000020202020202020202020202020202020202020202020202020200000"));
+    List<Object> row = (List<Object>) serde.deserialize(in);
+
+    Assert.assertEquals(null, row.get(0));
+    Assert.assertEquals(null, row.get(1));
+    Assert.assertEquals(null, row.get(3));
+    Assert.assertEquals(null, row.get(4));
+    Assert.assertEquals(null, row.get(5));
+    Assert.assertEquals(null, row.get(6));
+    Assert.assertEquals(null, row.get(7));
+    Assert.assertEquals(null, row.get(8));
+    Assert.assertEquals(null, row.get(9));
+    Assert.assertEquals(null, row.get(10));
+
+    BytesWritable res = (BytesWritable) serde.serialize(row, 
serde.getObjectInspector());
+    Assert.assertTrue(Arrays.equals(in.copyBytes(), res.copyBytes()));
+  }
+
+  public void testDeserializeCorruptedRecord() throws Exception {
+    BytesWritable in = new 
BytesWritable(BaseEncoding.base16().lowerCase().decode(
+        
"00004e6f762020202020201b006120646179203d2031312f31312f31312020202020202020203435ec10000000000000c5feff"
+            + 
"ff7707010000000000002a40ef2b3dab0d14e6531c8908a72700000007b20100313931312d31312d31312031393a32303a32312"
+            + 
"e3433333230301b00746573743a20202020202020343333322020202020202020333135ff"));
+
+    List<Object> row = (List<Object>) serde.deserialize(in);
+    Assert.assertEquals(null, row.get(0));
+    Assert.assertEquals(null, row.get(3));
+    Assert.assertEquals(null, row.get(10));
+  }
+}

Reply via email to